You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:21:56 UTC
[13/47] git commit: updated refs/heads/release-1.1 to 4c139ee
GIRAPH-912: Support succinct representation of messages in messagestores (pavanka)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f31e9a32
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f31e9a32
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f31e9a32
Branch: refs/heads/release-1.1
Commit: f31e9a328d3b4f10906a10a8e69d2ae515e3aba0
Parents: 61cb37e
Author: Pavan Kumar <pa...@fb.com>
Authored: Wed Jul 9 17:08:48 2014 -0700
Committer: Pavan Kumar <pa...@fb.com>
Committed: Wed Jul 9 17:08:48 2014 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../giraph/comm/SendMessageToAllCache.java | 308 -------------------
.../giraph/comm/SendOneMessageToManyCache.java | 306 ++++++++++++++++++
.../java/org/apache/giraph/comm/ServerData.java | 2 +
.../messages/AbstractListPerVertexStore.java | 103 +++++++
.../ByteArrayMessagesPerVertexStore.java | 17 +-
.../messages/InMemoryMessageStoreFactory.java | 114 +++++--
.../messages/MessageEncodeAndStoreType.java | 59 ++++
.../giraph/comm/messages/MessageStore.java | 15 +
.../comm/messages/OneMessagePerVertexStore.java | 5 +
.../messages/PointerListMessagesIterable.java | 105 +++++++
.../messages/PointerListPerVertexStore.java | 137 +++++++++
.../comm/messages/SimpleMessageStore.java | 4 +
.../out_of_core/DiskBackedMessageStore.java | 9 +
.../primitives/IntByteArrayMessageStore.java | 9 +
.../primitives/IntFloatMessageStore.java | 9 +
.../primitives/LongByteArrayMessageStore.java | 241 ---------------
.../primitives/LongDoubleMessageStore.java | 9 +
.../long_id/LongAbstractListMessageStore.java | 164 ++++++++++
.../long_id/LongAbstractMessageStore.java | 132 ++++++++
.../long_id/LongByteArrayMessageStore.java | 172 +++++++++++
.../long_id/LongPointerListMessageStore.java | 129 ++++++++
.../primitives/long_id/package-info.java | 22 ++
.../NettyWorkerClientRequestProcessor.java | 8 +-
.../giraph/comm/netty/NettyWorkerServer.java | 2 +-
.../giraph/comm/requests/RequestType.java | 6 +-
.../SendWorkerOneMessageToManyRequest.java | 156 ++++++++++
.../SendWorkerOneToAllMessagesRequest.java | 155 ----------
.../apache/giraph/conf/GiraphConfiguration.java | 17 +-
.../org/apache/giraph/conf/GiraphConstants.java | 20 +-
.../utils/ByteArrayOneMessageToManyIds.java | 105 +++++++
.../giraph/utils/ByteArrayOneToAllMessages.java | 168 ----------
.../utils/ByteStructVertexIdDataIterator.java | 9 +
.../ByteStructVertexIdMessageIterator.java | 10 +
.../utils/ExtendedByteArrayOutputBuffer.java | 155 ++++++++++
.../apache/giraph/utils/ExtendedDataOutput.java | 1 -
.../utils/OneMessageToManyIdsIterator.java | 143 +++++++++
.../apache/giraph/utils/UnsafeArrayReads.java | 2 +-
.../org/apache/giraph/utils/UnsafeReads.java | 2 +-
.../utils/UnsafeReusableByteArrayInput.java | 46 +++
.../giraph/utils/VertexIdDataIterator.java | 7 +
.../giraph/utils/VertexIdMessageIterator.java | 14 +
.../org/apache/giraph/comm/RequestTest.java | 14 +-
.../TestLongDoublePrimitiveMessageStores.java | 2 +-
44 files changed, 2160 insertions(+), 955 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 13dfcd7..0263749 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-912: Support succinct representation of messages in messagestores (pavanka)
+
GIRAPH-903: Detect crashes of Netty threads (edunov via pavanka)
GIRAPH-925: Unit tests should pass even if zookeeper port not available (edunov via pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
deleted file mode 100644
index 60858ea..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageToAllCache.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
-import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
-import org.apache.giraph.comm.requests.SendWorkerOneToAllMessagesRequest;
-import org.apache.giraph.comm.requests.WritableRequest;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayOneToAllMessages;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.PairList;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Aggregates the messages to be sent to workers so they can be sent
- * in bulk. Not thread-safe.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-public class SendMessageToAllCache<I extends WritableComparable,
- M extends Writable> extends SendMessageCache<I, M> {
- /** Class logger */
- private static final Logger LOG =
- Logger.getLogger(SendMessageToAllCache.class);
- /** Cache serialized messages for each worker */
- private final ByteArrayOneToAllMessages<I, M>[] oneToAllMsgCache;
- /** Tracking one-to-all message sizes for each worker */
- private final int[] oneToAllMsgSizes;
- /** Reused byte array to serialize target ids on each worker */
- private final ExtendedDataOutput[] idSerializer;
- /** Reused int array to count target id distribution */
- private final int[] idCounter;
- /**
- * Reused int array to record the partition id
- * of the first target vertex id found on the worker.
- */
- private final int[] firstPartitionMap;
- /** The WorkerInfo list */
- private final WorkerInfo[] workerInfoList;
-
- /**
- * Constructor
- *
- * @param conf Giraph configuration
- * @param serviceWorker Service worker
- * @param processor NettyWorkerClientRequestProcessor
- * @param maxMsgSize Max message size sent to a worker
- */
- public SendMessageToAllCache(ImmutableClassesGiraphConfiguration conf,
- CentralizedServiceWorker<?, ?, ?> serviceWorker,
- NettyWorkerClientRequestProcessor<I, ?, ?> processor,
- int maxMsgSize) {
- super(conf, serviceWorker, processor, maxMsgSize);
- int numWorkers = getNumWorkers();
- oneToAllMsgCache = new ByteArrayOneToAllMessages[numWorkers];
- oneToAllMsgSizes = new int[numWorkers];
- idSerializer = new ExtendedDataOutput[numWorkers];
- // InitialBufferSizes is alo initialized based on the number of workers.
- // As a result, initialBufferSizes is the same as idSerializer in length
- int initialBufferSize = 0;
- for (int i = 0; i < this.idSerializer.length; i++) {
- initialBufferSize = getSendWorkerInitialBufferSize(i);
- if (initialBufferSize > 0) {
- // InitialBufferSizes is from super class.
- // Each element is for one worker.
- idSerializer[i] = conf.createExtendedDataOutput(initialBufferSize);
- }
- }
- idCounter = new int[numWorkers];
- firstPartitionMap = new int[numWorkers];
- // Get worker info list.
- workerInfoList = new WorkerInfo[numWorkers];
- // Remember there could be null in the array.
- for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
- workerInfoList[workerInfo.getTaskId()] = workerInfo;
- }
- }
-
- /**
- * Reset ExtendedDataOutput array for id serialization
- * in next "one-to-all" message sending.
- */
- private void resetIdSerializers() {
- for (int i = 0; i < this.idSerializer.length; i++) {
- if (idSerializer[i] != null) {
- idSerializer[i].reset();
- }
- }
- }
-
- /**
- * Reset id counter for next "one-to-all" message sending.
- */
- private void resetIdCounter() {
- Arrays.fill(idCounter, 0);
- }
-
- /**
- * Add message with multiple ids to
- * one-to-all message cache.
- *
- * @param workerInfo The remote worker destination
- * @param ids A byte array to hold serialized vertex ids
- * @param idPos The end position of ids
- * information in the byte array above
- * @param count The number of target ids
- * @param message Message to send to remote worker
- * @return The size of messages for the worker.
- */
- private int addOneToAllMessage(
- WorkerInfo workerInfo, byte[] ids, int idPos, int count, M message) {
- // Get the data collection
- ByteArrayOneToAllMessages<I, M> workerData =
- oneToAllMsgCache[workerInfo.getTaskId()];
- if (workerData == null) {
- workerData = new ByteArrayOneToAllMessages<I, M>(
- getConf().getOutgoingMessageValueFactory());
- workerData.setConf(getConf());
- workerData.initialize(getSendWorkerInitialBufferSize(
- workerInfo.getTaskId()));
- oneToAllMsgCache[workerInfo.getTaskId()] = workerData;
- }
- workerData.add(ids, idPos, count, message);
- // Update the size of cached, outgoing data per worker
- oneToAllMsgSizes[workerInfo.getTaskId()] =
- workerData.getSize();
- return oneToAllMsgSizes[workerInfo.getTaskId()];
- }
-
- /**
- * Gets the one-to-all
- * messages for a worker and removes it from the cache.
- * Here the ByteArrayOneToAllMessages returned could be null.
- * But when invoking this method, we also check if the data size sent
- * to this worker is above the threshold. Therefore, it doesn't matter
- * if the result is null or not.
- *
- * @param workerInfo The target worker where one-to-all messages
- * go to.
- * @return ByteArrayOneToAllMessages that belong to the workerInfo
- */
- private ByteArrayOneToAllMessages<I, M>
- removeWorkerOneToAllMessages(WorkerInfo workerInfo) {
- ByteArrayOneToAllMessages<I, M> workerData =
- oneToAllMsgCache[workerInfo.getTaskId()];
- if (workerData != null) {
- oneToAllMsgCache[workerInfo.getTaskId()] = null;
- oneToAllMsgSizes[workerInfo.getTaskId()] = 0;
- }
- return workerData;
- }
-
- /**
- * Gets all the one-to-all
- * messages and removes them from the cache.
- *
- * @return All vertex messages for all workers
- */
- private PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>>
- removeAllOneToAllMessages() {
- PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>> allData =
- new PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>>();
- allData.initialize(oneToAllMsgCache.length);
- for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) {
- ByteArrayOneToAllMessages<I, M> workerData =
- removeWorkerOneToAllMessages(workerInfo);
- if (workerData != null && !workerData.isEmpty()) {
- allData.add(workerInfo, workerData);
- }
- }
- return allData;
- }
-
- @Override
- public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
- // This is going to be reused through every message sending
- resetIdSerializers();
- resetIdCounter();
- // Count messages
- int currentMachineId = 0;
- PartitionOwner owner = null;
- WorkerInfo workerInfo = null;
- I vertexId = null;
- while (vertexIdIterator.hasNext()) {
- vertexId = vertexIdIterator.next();
- owner = getServiceWorker().getVertexPartitionOwner(vertexId);
- workerInfo = owner.getWorkerInfo();
- currentMachineId = workerInfo.getTaskId();
- // Serialize this target vertex id
- try {
- vertexId.write(idSerializer[currentMachineId]);
- } catch (IOException e) {
- throw new IllegalStateException(
- "Failed to serialize the target vertex id.");
- }
- idCounter[currentMachineId]++;
- // Record the first partition id in the worker which message send to.
- // If idCounter shows there is only one target on this worker
- // then this is the partition number of the target vertex.
- if (idCounter[currentMachineId] == 1) {
- firstPartitionMap[currentMachineId] = owner.getPartitionId();
- }
- }
- // Add the message to the cache
- int idSerializerPos = 0;
- int workerMessageSize = 0;
- byte[] serializedId = null;
- WritableRequest writableRequest = null;
- for (int i = 0; i < idCounter.length; i++) {
- if (idCounter[i] == 1) {
- serializedId = idSerializer[i].getByteArray();
- idSerializerPos = idSerializer[i].getPos();
- // Add the message to the cache
- workerMessageSize = addMessage(workerInfoList[i],
- firstPartitionMap[i], serializedId, idSerializerPos, message);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("sendMessageToAllRequest: Send bytes (" +
- message.toString() + ") to one target in worker " +
- workerInfoList[i]);
- }
- ++totalMsgsSentInSuperstep;
- if (workerMessageSize >= maxMessagesSizePerWorker) {
- PairList<Integer, VertexIdMessages<I, M>>
- workerMessages = removeWorkerMessages(workerInfoList[i]);
- writableRequest =
- new SendWorkerMessagesRequest<I, M>(workerMessages);
- totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
- clientProcessor.doRequest(workerInfoList[i], writableRequest);
- // Notify sending
- getServiceWorker().getGraphTaskManager().notifySentMessages();
- }
- } else if (idCounter[i] > 1) {
- serializedId = idSerializer[i].getByteArray();
- idSerializerPos = idSerializer[i].getPos();
- workerMessageSize = addOneToAllMessage(
- workerInfoList[i], serializedId, idSerializerPos, idCounter[i],
- message);
-
- if (LOG.isTraceEnabled()) {
- LOG.trace("sendMessageToAllRequest: Send bytes (" +
- message.toString() + ") to all targets in worker" +
- workerInfoList[i]);
- }
- totalMsgsSentInSuperstep += idCounter[i];
- if (workerMessageSize >= maxMessagesSizePerWorker) {
- ByteArrayOneToAllMessages<I, M> workerOneToAllMessages =
- removeWorkerOneToAllMessages(workerInfoList[i]);
- writableRequest =
- new SendWorkerOneToAllMessagesRequest<I, M>(
- workerOneToAllMessages, getConf());
- totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
- clientProcessor.doRequest(workerInfoList[i], writableRequest);
- // Notify sending
- getServiceWorker().getGraphTaskManager().notifySentMessages();
- }
- }
- }
- }
-
- @Override
- public void flush() {
- super.flush();
- PairList<WorkerInfo, ByteArrayOneToAllMessages<I, M>>
- remainingOneToAllMessageCache =
- removeAllOneToAllMessages();
- PairList<WorkerInfo,
- ByteArrayOneToAllMessages<I, M>>.Iterator
- oneToAllMsgIterator = remainingOneToAllMessageCache.getIterator();
- while (oneToAllMsgIterator.hasNext()) {
- oneToAllMsgIterator.next();
- WritableRequest writableRequest =
- new SendWorkerOneToAllMessagesRequest<I, M>(
- oneToAllMsgIterator.getCurrentSecond(), getConf());
- totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
- clientProcessor.doRequest(
- oneToAllMsgIterator.getCurrentFirst(), writableRequest);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
new file mode 100644
index 0000000..c67a20b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendOneMessageToManyCache.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
+import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
+import org.apache.giraph.comm.requests.WritableRequest;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.PairList;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Aggregates the messages to be sent to workers so they can be sent
+ * in bulk.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+@NotThreadSafe
+@SuppressWarnings("unchecked")
+public class SendOneMessageToManyCache<I extends WritableComparable,
+ M extends Writable> extends SendMessageCache<I, M> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(SendOneMessageToManyCache.class);
+ /** Cache serialized one to many messages for each worker */
+ private final ByteArrayOneMessageToManyIds<I, M>[] msgVidsCache;
+ /** Tracking message-vertexIds sizes for each worker */
+ private final int[] msgVidsSizes;
+ /** Reused byte array to serialize target ids on each worker */
+ private final ExtendedDataOutput[] idSerializer;
+ /** Reused int array to count target id distribution */
+ private final int[] idCounter;
+ /**
+ * Reused int array to record the partition id
+ * of the first target vertex id found on the worker.
+ */
+ private final int[] firstPartitionMap;
+ /** The WorkerInfo list */
+ private final WorkerInfo[] workerInfoList;
+
+ /**
+ * Constructor
+ *
+ * @param conf Giraph configuration
+ * @param serviceWorker Service worker
+ * @param processor NettyWorkerClientRequestProcessor
+ * @param maxMsgSize Max message size sent to a worker
+ */
+ public SendOneMessageToManyCache(ImmutableClassesGiraphConfiguration conf,
+ CentralizedServiceWorker<?, ?, ?> serviceWorker,
+ NettyWorkerClientRequestProcessor<I, ?, ?> processor,
+ int maxMsgSize) {
+ super(conf, serviceWorker, processor, maxMsgSize);
+ int numWorkers = getNumWorkers();
+ msgVidsCache = new ByteArrayOneMessageToManyIds[numWorkers];
+ msgVidsSizes = new int[numWorkers];
+ idSerializer = new ExtendedDataOutput[numWorkers];
+ // InitialBufferSizes is alo initialized based on the number of workers.
+ // As a result, initialBufferSizes is the same as idSerializer in length
+ int initialBufferSize = 0;
+ for (int i = 0; i < this.idSerializer.length; i++) {
+ initialBufferSize = getSendWorkerInitialBufferSize(i);
+ if (initialBufferSize > 0) {
+ // InitialBufferSizes is from super class.
+ // Each element is for one worker.
+ idSerializer[i] = conf.createExtendedDataOutput(initialBufferSize);
+ }
+ }
+ idCounter = new int[numWorkers];
+ firstPartitionMap = new int[numWorkers];
+ // Get worker info list.
+ workerInfoList = new WorkerInfo[numWorkers];
+ // Remember there could be null in the array.
+ for (WorkerInfo workerInfo : serviceWorker.getWorkerInfoList()) {
+ workerInfoList[workerInfo.getTaskId()] = workerInfo;
+ }
+ }
+
+ /**
+ * Reset ExtendedDataOutput array for id serialization
+ * in next message-Vids encoding
+ */
+ private void resetIdSerializers() {
+ for (int i = 0; i < this.idSerializer.length; i++) {
+ if (idSerializer[i] != null) {
+ idSerializer[i].reset();
+ }
+ }
+ }
+
+ /**
+ * Reset id counter for next message-vertexIds encoding
+ */
+ private void resetIdCounter() {
+ Arrays.fill(idCounter, 0);
+ }
+
+ /**
+ * Add message with multiple target ids to message cache.
+ *
+ * @param workerInfo The remote worker destination
+ * @param ids A byte array to hold serialized vertex ids
+ * @param idPos The end position of ids
+ * information in the byte array above
+ * @param count The number of target ids
+ * @param message Message to send to remote worker
+ * @return The size of messages for the worker.
+ */
+ private int addOneToManyMessage(
+ WorkerInfo workerInfo, byte[] ids, int idPos, int count, M message) {
+ // Get the data collection
+ ByteArrayOneMessageToManyIds<I, M> workerData =
+ msgVidsCache[workerInfo.getTaskId()];
+ if (workerData == null) {
+ workerData = new ByteArrayOneMessageToManyIds<I, M>(
+ getConf().getOutgoingMessageValueFactory());
+ workerData.setConf(getConf());
+ workerData.initialize(getSendWorkerInitialBufferSize(
+ workerInfo.getTaskId()));
+ msgVidsCache[workerInfo.getTaskId()] = workerData;
+ }
+ workerData.add(ids, idPos, count, message);
+ // Update the size of cached, outgoing data per worker
+ msgVidsSizes[workerInfo.getTaskId()] =
+ workerData.getSize();
+ return msgVidsSizes[workerInfo.getTaskId()];
+ }
+
+ /**
+ * Gets the messages + vertexIds for a worker and removes it from the cache.
+ * Here the {@link org.apache.giraph.utils.ByteArrayOneMessageToManyIds}
+ * returned could be null.But when invoking this method, we also check if
+ * the data size sent to this worker is above the threshold.
+ * Therefore, it doesn't matter if the result is null or not.
+ *
+ * @param workerInfo Target worker to which one messages - many ids are sent
+ * @return {@link org.apache.giraph.utils.ByteArrayOneMessageToManyIds}
+ * that belong to the workerInfo
+ */
+ private ByteArrayOneMessageToManyIds<I, M>
+ removeWorkerMsgVids(WorkerInfo workerInfo) {
+ ByteArrayOneMessageToManyIds<I, M> workerData =
+ msgVidsCache[workerInfo.getTaskId()];
+ if (workerData != null) {
+ msgVidsCache[workerInfo.getTaskId()] = null;
+ msgVidsSizes[workerInfo.getTaskId()] = 0;
+ }
+ return workerData;
+ }
+
+ /**
+ * Gets all messages - vertexIds and removes them from the cache.
+ *
+ * @return All vertex messages for all workers
+ */
+ private PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>
+ removeAllMsgVids() {
+ PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>> allData =
+ new PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>();
+ allData.initialize(msgVidsCache.length);
+ for (WorkerInfo workerInfo : getWorkerPartitions().keySet()) {
+ ByteArrayOneMessageToManyIds<I, M> workerData =
+ removeWorkerMsgVids(workerInfo);
+ if (workerData != null && !workerData.isEmpty()) {
+ allData.add(workerInfo, workerData);
+ }
+ }
+ return allData;
+ }
+
+ @Override
+ public void sendMessageToAllRequest(Iterator<I> vertexIdIterator, M message) {
+ // This is going to be reused through every message sending
+ resetIdSerializers();
+ resetIdCounter();
+ // Count messages
+ int currentMachineId = 0;
+ PartitionOwner owner = null;
+ WorkerInfo workerInfo = null;
+ I vertexId = null;
+ while (vertexIdIterator.hasNext()) {
+ vertexId = vertexIdIterator.next();
+ owner = getServiceWorker().getVertexPartitionOwner(vertexId);
+ workerInfo = owner.getWorkerInfo();
+ currentMachineId = workerInfo.getTaskId();
+ // Serialize this target vertex id
+ try {
+ vertexId.write(idSerializer[currentMachineId]);
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "Failed to serialize the target vertex id.");
+ }
+ idCounter[currentMachineId]++;
+ // Record the first partition id in the worker which message send to.
+ // If idCounter shows there is only one target on this worker
+ // then this is the partition number of the target vertex.
+ if (idCounter[currentMachineId] == 1) {
+ firstPartitionMap[currentMachineId] = owner.getPartitionId();
+ }
+ }
+ // Add the message to the cache
+ int idSerializerPos = 0;
+ int workerMessageSize = 0;
+ byte[] serializedId = null;
+ WritableRequest writableRequest = null;
+ for (int i = 0; i < idCounter.length; i++) {
+ if (idCounter[i] == 1) {
+ serializedId = idSerializer[i].getByteArray();
+ idSerializerPos = idSerializer[i].getPos();
+ // Add the message to the cache
+ workerMessageSize = addMessage(workerInfoList[i],
+ firstPartitionMap[i], serializedId, idSerializerPos, message);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("sendMessageToAllRequest: Send bytes (" +
+ message.toString() + ") to one target in worker " +
+ workerInfoList[i]);
+ }
+ ++totalMsgsSentInSuperstep;
+ if (workerMessageSize >= maxMessagesSizePerWorker) {
+ PairList<Integer, VertexIdMessages<I, M>>
+ workerMessages = removeWorkerMessages(workerInfoList[i]);
+ writableRequest = new SendWorkerMessagesRequest<>(workerMessages);
+ totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
+ clientProcessor.doRequest(workerInfoList[i], writableRequest);
+ // Notify sending
+ getServiceWorker().getGraphTaskManager().notifySentMessages();
+ }
+ } else if (idCounter[i] > 1) {
+ serializedId = idSerializer[i].getByteArray();
+ idSerializerPos = idSerializer[i].getPos();
+ workerMessageSize = addOneToManyMessage(
+ workerInfoList[i], serializedId, idSerializerPos, idCounter[i],
+ message);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("sendMessageToAllRequest: Send bytes (" +
+ message.toString() + ") to all targets in worker" +
+ workerInfoList[i]);
+ }
+ totalMsgsSentInSuperstep += idCounter[i];
+ if (workerMessageSize >= maxMessagesSizePerWorker) {
+ ByteArrayOneMessageToManyIds<I, M> workerMsgVids =
+ removeWorkerMsgVids(workerInfoList[i]);
+ writableRequest = new SendWorkerOneMessageToManyRequest<>(
+ workerMsgVids, getConf());
+ totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
+ clientProcessor.doRequest(workerInfoList[i], writableRequest);
+ // Notify sending
+ getServiceWorker().getGraphTaskManager().notifySentMessages();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void flush() {
+ super.flush();
+ PairList<WorkerInfo, ByteArrayOneMessageToManyIds<I, M>>
+ remainingMsgVidsCache = removeAllMsgVids();
+ PairList<WorkerInfo,
+ ByteArrayOneMessageToManyIds<I, M>>.Iterator
+ msgIdsIterator = remainingMsgVidsCache.getIterator();
+ while (msgIdsIterator.hasNext()) {
+ msgIdsIterator.next();
+ WritableRequest writableRequest =
+ new SendWorkerOneMessageToManyRequest<>(
+ msgIdsIterator.getCurrentSecond(), getConf());
+ totalMsgBytesSentInSuperstep += writableRequest.getSerializedSize();
+ clientProcessor.doRequest(
+ msgIdsIterator.getCurrentFirst(), writableRequest);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
index b3f8733..85bfe04 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/ServerData.java
@@ -176,6 +176,8 @@ public class ServerData<I extends WritableComparable,
messageStoreFactory.newStore(conf.getIncomingMessageValueFactory());
incomingMessageStore =
messageStoreFactory.newStore(conf.getOutgoingMessageValueFactory());
+ // finalize current message-store before resolving mutations
+ currentMessageStore.finalizeStore();
currentWorkerToWorkerMessages = incomingWorkerToWorkerMessages;
incomingWorkerToWorkerMessages =
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
new file mode 100644
index 0000000..6840f86
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/AbstractListPerVertexStore.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.VertexIdIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Abstract Implementation of {@link SimpleMessageStore} where
+ * multiple messages are stored per vertex as a list
+ * Used when there is no combiner provided.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ * @param <L> List type
+ */
+public abstract class AbstractListPerVertexStore<I extends WritableComparable,
+ M extends Writable, L extends List> extends SimpleMessageStore<I, M, L> {
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Message class held in the store
+ * @param service Service worker
+ * @param config Hadoop configuration
+ */
+ public AbstractListPerVertexStore(
+ MessageValueFactory<M> messageValueFactory,
+ CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+ super(messageValueFactory, service, config);
+ }
+
+ /**
+ * Create an instance of L
+ * @return instance of L
+ */
+ protected abstract L createList();
+
+ /**
+ * Get the list of pointers for a vertex
+ * Each pointer has information of how to access an encoded message
+ * for this vertex
+ *
+ * @param iterator vertex id iterator
+ * @return pointer list
+ */
+ protected L getOrCreateList(VertexIdIterator<I> iterator) {
+ PartitionOwner owner =
+ service.getVertexPartitionOwner(iterator.getCurrentVertexId());
+ int partitionId = owner.getPartitionId();
+ ConcurrentMap<I, L> partitionMap = getOrCreatePartitionMap(partitionId);
+ L list = partitionMap.get(iterator.getCurrentVertexId());
+ if (list == null) {
+ L newList = createList();
+ list = partitionMap.putIfAbsent(
+ iterator.releaseCurrentVertexId(), newList);
+ if (list == null) {
+ list = newList;
+ }
+ }
+ return list;
+ }
+
+ @Override
+ public Iterable<M> getVertexMessages(I vertexId) throws IOException {
+ ConcurrentMap<I, L> partitionMap =
+ map.get(getPartitionId(vertexId));
+ if (partitionMap == null) {
+ return Collections.<M>emptyList();
+ }
+ L list = partitionMap.get(vertexId);
+ return list == null ? Collections.<M>emptyList() :
+ getMessagesAsIterable(list);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
index 65939bb..57d255f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/ByteArrayMessagesPerVertexStore.java
@@ -62,6 +62,11 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
super(messageValueFactory, service, config);
}
+ @Override
+ public boolean isPointerListEncoding() {
+ return false;
+ }
+
/**
* Get the extended data output for a vertex id from the iterator, creating
* if necessary. This method will take ownership of the vertex id from the
@@ -89,12 +94,10 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
@Override
public void addPartitionMessages(
- int partitionId,
- VertexIdMessages<I, M> messages) throws IOException {
+ int partitionId, VertexIdMessages<I, M> messages) throws IOException {
ConcurrentMap<I, DataInputOutput> partitionMap =
getOrCreatePartitionMap(partitionId);
- VertexIdMessageBytesIterator<I, M>
- vertexIdMessageBytesIterator =
+ VertexIdMessageBytesIterator<I, M> vertexIdMessageBytesIterator =
messages.getVertexIdMessageBytesIterator();
// Try to copy the message buffer over rather than
// doing a deserialization of a message just to know its size. This
@@ -113,8 +116,8 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
}
}
} else {
- VertexIdMessageIterator<I, M>
- vertexIdMessageIterator = messages.getVertexIdMessageIterator();
+ VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+ messages.getVertexIdMessageIterator();
while (vertexIdMessageIterator.hasNext()) {
vertexIdMessageIterator.next();
DataInputOutput dataInputOutput =
@@ -188,7 +191,7 @@ public class ByteArrayMessagesPerVertexStore<I extends WritableComparable,
* @param <M> Message data
*/
private static class Factory<I extends WritableComparable, M extends Writable>
- implements MessageStoreFactory<I, M, MessageStore<I, M>> {
+ implements MessageStoreFactory<I, M, MessageStore<I, M>> {
/** Service worker */
private CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
index f691d3e..db22503 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/InMemoryMessageStoreFactory.java
@@ -22,8 +22,10 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
-import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongPointerListMessageStore;
+import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.hadoop.io.DoubleWritable;
@@ -43,6 +45,7 @@ import org.apache.log4j.Logger;
* @param <I> Vertex id
* @param <M> Message data
*/
+@SuppressWarnings("unchecked")
public class InMemoryMessageStoreFactory<I extends WritableComparable,
M extends Writable>
implements MessageStoreFactory<I, M, MessageStore<I, M>> {
@@ -51,9 +54,9 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
Logger.getLogger(InMemoryMessageStoreFactory.class);
/** Service worker */
- private CentralizedServiceWorker<I, ?, ?> service;
+ protected CentralizedServiceWorker<I, ?, ?> service;
/** Hadoop configuration */
- private ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
+ protected ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
/**
* Default constructor allowing class invocation via Reflection.
@@ -61,46 +64,89 @@ public class InMemoryMessageStoreFactory<I extends WritableComparable,
public InMemoryMessageStoreFactory() {
}
- @Override
- public MessageStore<I, M> newStore(
- MessageValueFactory<M> messageValueFactory) {
+ /**
+ * MessageStore to be used when combiner is enabled
+ *
+ * @param messageValueFactory message value factory
+ * @return message store
+ */
+ protected MessageStore<I, M> newStoreWithCombiner(
+ MessageValueFactory<M> messageValueFactory) {
Class<M> messageClass = messageValueFactory.getValueClass();
MessageStore messageStore;
- if (conf.useMessageCombiner()) {
- Class<I> vertexIdClass = conf.getVertexIdClass();
- if (vertexIdClass.equals(IntWritable.class) &&
- messageClass.equals(FloatWritable.class)) {
- messageStore = new IntFloatMessageStore(
- (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
- (MessageCombiner<IntWritable, FloatWritable>)
- conf.<FloatWritable>createMessageCombiner());
- } else if (vertexIdClass.equals(LongWritable.class) &&
- messageClass.equals(DoubleWritable.class)) {
- messageStore = new LongDoubleMessageStore(
+ Class<I> vertexIdClass = conf.getVertexIdClass();
+ if (vertexIdClass.equals(IntWritable.class) &&
+ messageClass.equals(FloatWritable.class)) {
+ messageStore = new IntFloatMessageStore(
+ (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
+ (MessageCombiner<IntWritable, FloatWritable>)
+ conf.<FloatWritable>createMessageCombiner());
+ } else if (vertexIdClass.equals(LongWritable.class) &&
+ messageClass.equals(DoubleWritable.class)) {
+ messageStore = new LongDoubleMessageStore(
(CentralizedServiceWorker<LongWritable, Writable, Writable>) service,
(MessageCombiner<LongWritable, DoubleWritable>)
conf.<DoubleWritable>createMessageCombiner());
- } else {
- messageStore = new OneMessagePerVertexStore<I, M>(messageValueFactory,
+ } else {
+ messageStore = new OneMessagePerVertexStore(messageValueFactory,
service, conf.<M>createMessageCombiner(), conf);
+ }
+ return messageStore;
+ }
+
+ /**
+ * MessageStore to be used when combiner is not enabled
+ *
+ * @param messageValueFactory message value factory
+ * @return message store
+ */
+ protected MessageStore<I, M> newStoreWithoutCombiner(
+ MessageValueFactory<M> messageValueFactory) {
+ MessageStore messageStore = null;
+ MessageEncodeAndStoreType encodeAndStore = GiraphConstants
+ .MESSAGE_ENCODE_AND_STORE_TYPE.get(conf);
+ Class<I> vertexIdClass = conf.getVertexIdClass();
+ if (vertexIdClass.equals(IntWritable.class)) { // INT
+ messageStore = new IntByteArrayMessageStore(messageValueFactory,
+ service, conf);
+ } else if (vertexIdClass.equals(LongWritable.class)) { // LONG
+ if (encodeAndStore.equals(
+ MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
+ encodeAndStore.equals(
+ MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
+ messageStore = new LongByteArrayMessageStore(messageValueFactory,
+ service, conf);
+ } else if (encodeAndStore.equals(
+ MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
+ messageStore = new LongPointerListMessageStore(messageValueFactory,
+ service, conf);
}
- } else {
- Class<I> vertexIdClass = conf.getVertexIdClass();
- if (vertexIdClass.equals(IntWritable.class)) {
- messageStore = new IntByteArrayMessageStore<M>(messageValueFactory,
- (CentralizedServiceWorker<IntWritable, Writable, Writable>) service,
- (ImmutableClassesGiraphConfiguration<IntWritable, Writable, Writable>)
- conf);
- } else if (vertexIdClass.equals(LongWritable.class)) {
- messageStore = new LongByteArrayMessageStore<M>(messageValueFactory,
- (CentralizedServiceWorker<LongWritable, Writable, Writable>) service,
- (ImmutableClassesGiraphConfiguration<LongWritable, Writable,
- Writable>) conf);
- } else {
- messageStore = new ByteArrayMessagesPerVertexStore<I, M>(
- messageValueFactory, service, conf);
+ } else { // GENERAL
+ if (encodeAndStore.equals(
+ MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION) ||
+ encodeAndStore.equals(
+ MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION)) {
+ messageStore = new ByteArrayMessagesPerVertexStore<>(
+ messageValueFactory, service, conf);
+ } else if (encodeAndStore.equals(
+ MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) {
+ messageStore = new PointerListPerVertexStore(messageValueFactory,
+ service, conf);
}
}
+ return messageStore;
+ }
+
+ @Override
+ public MessageStore<I, M> newStore(
+ MessageValueFactory<M> messageValueFactory) {
+ Class<M> messageClass = messageValueFactory.getValueClass();
+ MessageStore messageStore;
+ if (conf.useMessageCombiner()) {
+ messageStore = newStoreWithCombiner(messageValueFactory);
+ } else {
+ messageStore = newStoreWithoutCombiner(messageValueFactory);
+ }
if (LOG.isInfoEnabled()) {
LOG.info("newStore: Created " + messageStore.getClass() +
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java
new file mode 100644
index 0000000..7a5b702
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageEncodeAndStoreType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+/**
+ * There are two types of message-stores currently
+ * pointer based, and default byte-array based
+ */
+public enum MessageEncodeAndStoreType {
+ /**
+ * Use message-store which is based on list of pointers to encoded messages
+ */
+ POINTER_LIST_PER_VERTEX(true),
+ /**
+ * Extract a byte array per partition from one message to many ids encoding
+ * and then store
+ */
+ EXTRACT_BYTEARRAY_PER_PARTITION(true),
+ /**
+ * Use a byte-array to store messages for each partition
+ */
+ BYTEARRAY_PER_PARTITION(false);
+
+ /** Can use one message to many ids encoding? */
+ private final boolean oneMessageToManyIdsEncoding;
+
+ /**
+ * Constructor
+ *
+ * @param oneMessageToManyIdsEncoding use one message to many ids encoding
+ */
+ MessageEncodeAndStoreType(boolean oneMessageToManyIdsEncoding) {
+ this.oneMessageToManyIdsEncoding = oneMessageToManyIdsEncoding;
+ }
+
+ /**
+ * True if one message to many ids encoding is set
+ * @return return oneMessageToManyIdsEncoding
+ */
+ public boolean useOneMessageToManyIdsEncoding() {
+ return oneMessageToManyIdsEncoding;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
index 7d0bbc6..6f1179a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/MessageStore.java
@@ -34,6 +34,14 @@ import org.apache.hadoop.io.WritableComparable;
public interface MessageStore<I extends WritableComparable,
M extends Writable> {
/**
+ * True if this message-store encodes messages as a list of long pointers
+ * to compact serialized messages
+ *
+ * @return true if we encode messages as a list of pointers
+ */
+ boolean isPointerListEncoding();
+
+ /**
* Gets messages for a vertex. The lifetime of every message is only
* guaranteed until the iterator's next() method is called. Do not hold
* references to objects returned by this iterator.
@@ -79,6 +87,13 @@ public interface MessageStore<I extends WritableComparable,
throws IOException;
/**
+ * Called before start of computation in bspworker
+ * Since it is run from a single thread while the store is not being
+ * accessed by any other thread - this is ensured to be thread-safe
+ */
+ void finalizeStore();
+
+ /**
* Gets vertex ids from selected partition which we have messages for
*
* @param partitionId Id of partition
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
index 9bede06..d3942d4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/OneMessagePerVertexStore.java
@@ -63,6 +63,11 @@ public class OneMessagePerVertexStore<I extends WritableComparable,
}
@Override
+ public boolean isPointerListEncoding() {
+ return false;
+ }
+
+ @Override
public void addPartitionMessages(
int partitionId,
VertexIdMessages<I, M> messages) throws IOException {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java
new file mode 100644
index 0000000..e5a1691
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListMessagesIterable.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongListIterator;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Create an iterable for messages based on a pointer list
+ *
+ * @param <M> messageType
+ */
+public class PointerListMessagesIterable<M extends Writable>
+ implements Iterable<M> {
+ /** Message class */
+ private final MessageValueFactory<M> messageValueFactory;
+ /** List of pointers to messages in byte array */
+ private final LongArrayList pointers;
+ /** Holds the byte arrays of serialized messages */
+ private final ExtendedByteArrayOutputBuffer msgBuffer;
+ /** Reader to read data from byte buffer */
+ private final UnsafeReusableByteArrayInput messageReader;
+
+ /**
+ *
+ * @param messageValueFactory message value factory
+ * @param pointers pointers to messages in buffer
+ * @param msgBuffer holds the byte arrays of serialized messages
+ */
+ public PointerListMessagesIterable(MessageValueFactory<M> messageValueFactory,
+ LongArrayList pointers, ExtendedByteArrayOutputBuffer msgBuffer) {
+ this.messageValueFactory = messageValueFactory;
+ this.pointers = pointers;
+ this.msgBuffer = msgBuffer;
+ // TODO - if needed implement same for Safe as well
+ messageReader = new UnsafeReusableByteArrayInput();
+ }
+
+ /**
+ * Create message from factory
+ *
+ * @return message instance
+ */
+ protected M createMessage() {
+ return messageValueFactory.newInstance();
+ }
+
+ @Override
+ public Iterator<M> iterator() {
+ return new Iterator<M>() {
+ private final LongListIterator iterator = pointers.iterator();
+ private final M reusableMsg =
+ PointerListMessagesIterable.this.createMessage();
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public M next() {
+ long pointer = iterator.next();
+ try {
+ int index = (int) (pointer >>> 32);
+ int offset = (int) pointer;
+ ExtendedDataOutput buffer = msgBuffer.getDataOutput(index);
+ messageReader.initialize(buffer.getByteArray(), offset,
+ buffer.getPos());
+ reusableMsg.readFields(messageReader);
+ } catch (IOException e) {
+ throw new IllegalStateException("Got exception : " + e);
+ }
+ return reusableMsg;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
new file mode 100644
index 0000000..cce0439
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/PointerListPerVertexStore.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
+
+/**
+ * Implementation of {@link SimpleMessageStore} where multiple messages are
+ * stored as a list of long pointers to extended data output objects
+ * Used when there is no combiner provided.
+ *
+ * @param <I> vertexId type
+ * @param <M> message type
+ */
+public class PointerListPerVertexStore<I extends WritableComparable,
+ M extends Writable> extends AbstractListPerVertexStore<I, M, LongArrayList> {
+
+ /** Buffers of byte array outputs used to store messages - thread safe */
+ private final ExtendedByteArrayOutputBuffer bytesBuffer;
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Message class held in the store
+ * @param service Service worker
+ * @param config Hadoop configuration
+ */
+ public PointerListPerVertexStore(
+ MessageValueFactory<M> messageValueFactory,
+ CentralizedServiceWorker<I, ?, ?> service,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> config) {
+ super(messageValueFactory, service, config);
+ bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
+ }
+
+ @Override
+ public boolean isPointerListEncoding() {
+ return true;
+ }
+
+ @Override
+ protected LongArrayList createList() {
+ return new LongArrayList();
+ }
+
+ @Override
+ public void addPartitionMessages(
+ int partitionId, VertexIdMessages<I, M> messages) throws IOException {
+ VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+ messages.getVertexIdMessageIterator();
+ long pointer = 0;
+ LongArrayList list;
+ while (vertexIdMessageIterator.hasNext()) {
+ vertexIdMessageIterator.next();
+ M msg = vertexIdMessageIterator.getCurrentMessage();
+ list = getOrCreateList(vertexIdMessageIterator);
+ if (vertexIdMessageIterator.isNewMessage()) {
+ IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+ pointer = indexAndDataOut.getIndex();
+ pointer <<= 32;
+ ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+ pointer += dataOutput.getPos();
+ msg.write(dataOutput);
+ }
+ synchronized (list) {
+ list.add(pointer);
+ }
+ }
+ }
+
+ /**
+ * Get messages as an iterable from message storage
+ *
+ * @param pointers list of pointers to messages
+ * @return Messages as an iterable
+ */
+ @Override
+ public Iterable<M> getMessagesAsIterable(LongArrayList pointers) {
+ return new PointerListMessagesIterable<>(messageValueFactory, pointers,
+ bytesBuffer);
+ }
+
+ @Override
+ protected int getNumberOfMessagesIn(ConcurrentMap<I,
+ LongArrayList> partitionMap) {
+ int numberOfMessages = 0;
+ for (LongArrayList list : partitionMap.values()) {
+ numberOfMessages += list.size();
+ }
+ return numberOfMessages;
+ }
+
+ // FIXME -- complete these for check-pointing
+ @Override
+ protected void writeMessages(LongArrayList messages, DataOutput out)
+ throws IOException {
+
+ }
+
+ @Override
+ protected LongArrayList readFieldsForMessages(DataInput in)
+ throws IOException {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
index 13292a2..37b8c05 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/SimpleMessageStore.java
@@ -139,6 +139,10 @@ public abstract class SimpleMessageStore<I extends WritableComparable,
}
@Override
+ public void finalizeStore() {
+ }
+
+ @Override
public Iterable<I> getPartitionDestinationVertices(int partitionId) {
ConcurrentMap<I, ?> partitionMap = map.get(partitionId);
return (partitionMap == null) ? Collections.<I>emptyList() :
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
index 18b7798..3000cd4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/out_of_core/DiskBackedMessageStore.java
@@ -83,6 +83,11 @@ public class DiskBackedMessageStore<I extends WritableComparable,
}
@Override
+ public boolean isPointerListEncoding() {
+ return false;
+ }
+
+ @Override
public void addPartitionMessages(
int partitionId,
VertexIdMessages<I, M> messages) throws IOException {
@@ -106,6 +111,10 @@ public class DiskBackedMessageStore<I extends WritableComparable,
}
@Override
+ public void finalizeStore() {
+ }
+
+ @Override
public Iterable<M> getVertexMessages(I vertexId) throws IOException {
if (hasMessagesForVertex(vertexId)) {
return getMessageStore(vertexId).getVertexMessages(vertexId);
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
index dbc1ce8..0012bf0 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntByteArrayMessageStore.java
@@ -94,6 +94,11 @@ public class IntByteArrayMessageStore<M extends Writable>
}
}
+ @Override
+ public boolean isPointerListEncoding() {
+ return false;
+ }
+
/**
* Get map which holds messages for partition which vertex belongs to.
*
@@ -161,6 +166,10 @@ public class IntByteArrayMessageStore<M extends Writable>
}
@Override
+ public void finalizeStore() {
+ }
+
+ @Override
public void clearPartition(int partitionId) throws IOException {
map.get(partitionId).clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index be75ee8..97086e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -86,6 +86,11 @@ public class IntFloatMessageStore
}
}
+ @Override
+ public boolean isPointerListEncoding() {
+ return false;
+ }
+
/**
* Get map which holds messages for partition which vertex belongs to.
*
@@ -126,6 +131,10 @@ public class IntFloatMessageStore
}
@Override
+ public void finalizeStore() {
+ }
+
+ @Override
public void clearPartition(int partitionId) throws IOException {
map.get(partitionId).clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
deleted file mode 100644
index 3110864..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongByteArrayMessageStore.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.utils.VertexIdMessageBytesIterator;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.VerboseByteStructMessageWrite;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Lists;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.LongIterator;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> Message type
- */
-public class LongByteArrayMessageStore<M extends Writable>
- implements MessageStore<LongWritable, M> {
- /** Message value factory */
- protected final MessageValueFactory<M> messageValueFactory;
- /** Map from partition id to map from vertex id to message */
- private final
- Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>> map;
- /** Service worker */
- private final CentralizedServiceWorker<LongWritable, ?, ?> service;
- /** Giraph configuration */
- private final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?> config;
-
- /**
- * Constructor
- *
- * @param messageValueFactory Factory for creating message values
- * @param service Service worker
- * @param config Hadoop configuration
- */
- public LongByteArrayMessageStore(
- MessageValueFactory<M> messageValueFactory,
- CentralizedServiceWorker<LongWritable, Writable, Writable> service,
- ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
- config) {
- this.messageValueFactory = messageValueFactory;
- this.service = service;
- this.config = config;
-
- map =
- new Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<DataInputOutput>>();
- for (int partitionId : service.getPartitionStore().getPartitionIds()) {
- Partition<LongWritable, Writable, Writable> partition =
- service.getPartitionStore().getOrCreatePartition(partitionId);
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
- new Long2ObjectOpenHashMap<DataInputOutput>(
- (int) partition.getVertexCount());
- map.put(partitionId, partitionMap);
- service.getPartitionStore().putPartition(partition);
- }
- }
-
- /**
- * Get map which holds messages for partition which vertex belongs to.
- *
- * @param vertexId Id of the vertex
- * @return Map which holds messages for partition which vertex belongs to.
- */
- private Long2ObjectOpenHashMap<DataInputOutput> getPartitionMap(
- LongWritable vertexId) {
- return map.get(service.getPartitionId(vertexId));
- }
-
- /**
- * Get the DataInputOutput for a vertex id, creating if necessary.
- *
- * @param partitionMap Partition map to look in
- * @param vertexId Id of the vertex
- * @return DataInputOutput for this vertex id (created if necessary)
- */
- private DataInputOutput getDataInputOutput(
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap,
- long vertexId) {
- DataInputOutput dataInputOutput = partitionMap.get(vertexId);
- if (dataInputOutput == null) {
- dataInputOutput = config.createMessagesInputOutput();
- partitionMap.put(vertexId, dataInputOutput);
- }
- return dataInputOutput;
- }
-
- @Override
- public void addPartitionMessages(int partitionId,
- VertexIdMessages<LongWritable, M> messages) throws
- IOException {
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
- map.get(partitionId);
- synchronized (partitionMap) {
- VertexIdMessageBytesIterator<LongWritable, M>
- vertexIdMessageBytesIterator =
- messages.getVertexIdMessageBytesIterator();
- // Try to copy the message buffer over rather than
- // doing a deserialization of a message just to know its size. This
- // should be more efficient for complex objects where serialization is
- // expensive. If this type of iterator is not available, fall back to
- // deserializing/serializing the messages
- if (vertexIdMessageBytesIterator != null) {
- while (vertexIdMessageBytesIterator.hasNext()) {
- vertexIdMessageBytesIterator.next();
- DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
- vertexIdMessageBytesIterator.getCurrentVertexId().get());
- vertexIdMessageBytesIterator.writeCurrentMessageBytes(
- dataInputOutput.getDataOutput());
- }
- } else {
- VertexIdMessageIterator<LongWritable, M>
- iterator = messages.getVertexIdMessageIterator();
- while (iterator.hasNext()) {
- iterator.next();
- DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
- iterator.getCurrentVertexId().get());
- VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
- dataInputOutput.getDataOutput());
- }
- }
- }
- }
-
- @Override
- public void clearPartition(int partitionId) throws IOException {
- map.get(partitionId).clear();
- }
-
- @Override
- public boolean hasMessagesForVertex(LongWritable vertexId) {
- return getPartitionMap(vertexId).containsKey(vertexId.get());
- }
-
- @Override
- public Iterable<M> getVertexMessages(
- LongWritable vertexId) throws IOException {
- DataInputOutput dataInputOutput =
- getPartitionMap(vertexId).get(vertexId.get());
- if (dataInputOutput == null) {
- return EmptyIterable.get();
- } else {
- return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
- }
- }
-
- @Override
- public void clearVertexMessages(LongWritable vertexId) throws IOException {
- getPartitionMap(vertexId).remove(vertexId.get());
- }
-
- @Override
- public void clearAll() throws IOException {
- map.clear();
- }
-
- @Override
- public Iterable<LongWritable> getPartitionDestinationVertices(
- int partitionId) {
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
- map.get(partitionId);
- List<LongWritable> vertices =
- Lists.newArrayListWithCapacity(partitionMap.size());
- LongIterator iterator = partitionMap.keySet().iterator();
- while (iterator.hasNext()) {
- vertices.add(new LongWritable(iterator.nextLong()));
- }
- return vertices;
- }
-
- @Override
- public void writePartition(DataOutput out,
- int partitionId) throws IOException {
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
- map.get(partitionId);
- out.writeInt(partitionMap.size());
- ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
- partitionMap.long2ObjectEntrySet().fastIterator();
- while (iterator.hasNext()) {
- Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
- out.writeLong(entry.getLongKey());
- entry.getValue().write(out);
- }
- }
-
- @Override
- public void readFieldsForPartition(DataInput in,
- int partitionId) throws IOException {
- int size = in.readInt();
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
- new Long2ObjectOpenHashMap<DataInputOutput>(size);
- while (size-- > 0) {
- long vertexId = in.readLong();
- DataInputOutput dataInputOutput = config.createMessagesInputOutput();
- dataInputOutput.readFields(in);
- partitionMap.put(vertexId, dataInputOutput);
- }
- synchronized (map) {
- map.put(partitionId, partitionMap);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 264e65a..b0452c1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -83,6 +83,11 @@ public class LongDoubleMessageStore
}
}
+ @Override
+ public boolean isPointerListEncoding() {
+ return false;
+ }
+
/**
* Get map which holds messages for partition which vertex belongs to.
*
@@ -123,6 +128,10 @@ public class LongDoubleMessageStore
}
@Override
+ public void finalizeStore() {
+ }
+
+ @Override
public void clearPartition(int partitionId) throws IOException {
map.get(partitionId).clear();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
new file mode 100644
index 0000000..ae61de4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.VertexIdIterator;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.util.List;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <L> list type
+ */
+public abstract class LongAbstractListMessageStore<M extends Writable,
+ L extends List> extends LongAbstractMessageStore<M, L> {
+ /**
+ * Map used to store messages for nascent vertices i.e., ones
+ * that did not exist at the start of current superstep but will get
+ * created because of sending message to a non-existent vertex id
+ */
+ private final
+ Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Factory for creating message values
+ * @param service Service worker
+ * @param config Hadoop configuration
+ */
+ public LongAbstractListMessageStore(
+ MessageValueFactory<M> messageValueFactory,
+ CentralizedServiceWorker<LongWritable, Writable, Writable> service,
+ ImmutableClassesGiraphConfiguration<LongWritable,
+ Writable, Writable> config) {
+ super(messageValueFactory, service, config);
+ populateMap();
+
+ // create map for vertex ids (i.e., nascent vertices) not known yet
+ nascentMap = new Int2ObjectOpenHashMap<>();
+ for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+ nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
+ }
+ }
+
+ /**
+ * Populate the map with all vertexIds for each partition
+ */
+ private void populateMap() { // TODO - can parallelize?
+ // populate with vertex ids already known
+ for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+ Partition<LongWritable, ?, ?> partition = service.getPartitionStore()
+ .getOrCreatePartition(partitionId);
+ Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+ for (Vertex<LongWritable, ?, ?> vertex : partition) {
+ partitionMap.put(vertex.getId().get(), createList());
+ }
+ }
+ }
+
+ /**
+ * Create an instance of L
+ * @return instance of L
+ */
+ protected abstract L createList();
+
+ /**
+ * Get list for the current vertexId
+ *
+ * @param iterator vertexId iterator
+ * @return list for current vertexId
+ */
+ protected L getList(
+ VertexIdIterator<LongWritable> iterator) {
+ PartitionOwner owner =
+ service.getVertexPartitionOwner(iterator.getCurrentVertexId());
+ long vertexId = iterator.getCurrentVertexId().get();
+ int partitionId = owner.getPartitionId();
+ Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+ if (!partitionMap.containsKey(vertexId)) {
+ synchronized (nascentMap) {
+ // assumption: not many nascent vertices are created
+ // so overall synchronization is negligible
+ Long2ObjectOpenHashMap<L> nascentPartitionMap =
+ nascentMap.get(partitionId);
+ if (nascentPartitionMap.get(vertexId) == null) {
+ nascentPartitionMap.put(vertexId, createList());
+ }
+ return nascentPartitionMap.get(vertexId);
+ }
+ }
+ return partitionMap.get(vertexId);
+ }
+
+ @Override
+ public void finalizeStore() {
+ for (int partitionId : nascentMap.keySet()) {
+ // nascent vertices are present only in nascent map
+ map.get(partitionId).putAll(nascentMap.get(partitionId));
+ }
+ nascentMap.clear();
+ }
+
+ // TODO - discussion
+ /*
+ some approaches for ensuring correctness with parallel inserts
+ - current approach: uses a small extra bit of memory by pre-populating
+ map & pushes everything map cannot handle to nascentMap
+ at the beginning of next superstep compute a single threaded finalizeStore is
+ called (so little extra memory + 1 sequential finish ops)
+ - used striped parallel fast utils instead (unsure of perf)
+ - use concurrent map (every get gets far slower)
+ - use reader writer locks (unsure of perf)
+ (code looks something like underneath)
+
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+ rwl.readLock().lock();
+ L list = partitionMap.get(vertexId);
+ if (list == null) {
+ rwl.readLock().unlock();
+ rwl.writeLock().lock();
+ if (partitionMap.get(vertexId) == null) {
+ list = createList();
+ partitionMap.put(vertexId, list);
+ }
+ rwl.readLock().lock();
+ rwl.writeLock().unlock();
+ }
+ rwl.readLock().unlock();
+ - adopted from the article
+ http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
+ ReentrantReadWriteLock.html
+ */
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
new file mode 100644
index 0000000..9ee090e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import com.google.common.collect.Lists;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.partition.Partition;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <T> datastructure used to hold messages
+ */
+public abstract class LongAbstractMessageStore<M extends Writable, T>
+ implements MessageStore<LongWritable, M> {
+ /** Message value factory */
+ protected final MessageValueFactory<M> messageValueFactory;
+ /** Map from partition id to map from vertex id to message */
+ protected final
+ Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
+ /** Service worker */
+ protected final CentralizedServiceWorker<LongWritable, ?, ?> service;
+ /** Giraph configuration */
+ protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
+ config;
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Factory for creating message values
+ * @param service Service worker
+ * @param config Hadoop configuration
+ */
+ public LongAbstractMessageStore(
+ MessageValueFactory<M> messageValueFactory,
+ CentralizedServiceWorker<LongWritable, Writable, Writable> service,
+ ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
+ config) {
+ this.messageValueFactory = messageValueFactory;
+ this.service = service;
+ this.config = config;
+
+ map = new Int2ObjectOpenHashMap<>();
+ for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+ Partition<LongWritable, Writable, Writable> partition =
+ service.getPartitionStore().getOrCreatePartition(partitionId);
+ Long2ObjectOpenHashMap<T> partitionMap =
+ new Long2ObjectOpenHashMap<T>(
+ (int) partition.getVertexCount());
+ map.put(partitionId, partitionMap);
+ service.getPartitionStore().putPartition(partition);
+ }
+ }
+
+ /**
+ * Get map which holds messages for partition which vertex belongs to.
+ *
+ * @param vertexId Id of the vertex
+ * @return Map which holds messages for partition which vertex belongs to.
+ */
+ protected Long2ObjectOpenHashMap<T> getPartitionMap(
+ LongWritable vertexId) {
+ return map.get(service.getPartitionId(vertexId));
+ }
+
+ @Override
+ public void clearPartition(int partitionId) throws IOException {
+ map.get(partitionId).clear();
+ }
+
+ @Override
+ public boolean hasMessagesForVertex(LongWritable vertexId) {
+ return getPartitionMap(vertexId).containsKey(vertexId.get());
+ }
+
+ @Override
+ public void clearVertexMessages(LongWritable vertexId) throws IOException {
+ getPartitionMap(vertexId).remove(vertexId.get());
+ }
+
+
+ @Override
+ public void clearAll() throws IOException {
+ map.clear();
+ }
+
+ @Override
+ public Iterable<LongWritable> getPartitionDestinationVertices(
+ int partitionId) {
+ Long2ObjectOpenHashMap<T> partitionMap =
+ map.get(partitionId);
+ List<LongWritable> vertices =
+ Lists.newArrayListWithCapacity(partitionMap.size());
+ LongIterator iterator = partitionMap.keySet().iterator();
+ while (iterator.hasNext()) {
+ vertices.add(new LongWritable(iterator.nextLong()));
+ }
+ return vertices;
+ }
+
+}