You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:21:55 UTC
[12/47] GIRAPH-912: Support succinct representation of messages in
messagestores (pavanka)
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
new file mode 100644
index 0000000..092d963
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.io.DataInputOutput;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> Message type
+ */
+public class LongByteArrayMessageStore<M extends Writable>
+ extends LongAbstractMessageStore<M, DataInputOutput> {
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Factory for creating message values
+ * @param service Service worker
+ * @param config Hadoop configuration
+ */
+ public LongByteArrayMessageStore(
+ MessageValueFactory<M> messageValueFactory,
+ CentralizedServiceWorker<LongWritable, Writable, Writable> service,
+ ImmutableClassesGiraphConfiguration<LongWritable,
+ Writable, Writable> config) {
+ super(messageValueFactory, service, config);
+ }
+
+ @Override
+ public boolean isPointerListEncoding() {
+ return false;
+ }
+
+ /**
+ * Get the DataInputOutput for a vertex id, creating if necessary.
+ *
+ * @param partitionMap Partition map to look in
+ * @param vertexId Id of the vertex
+ * @return DataInputOutput for this vertex id (created if necessary)
+ */
+ private DataInputOutput getDataInputOutput(
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap, long vertexId) {
+ DataInputOutput dataInputOutput = partitionMap.get(vertexId);
+ if (dataInputOutput == null) {
+ dataInputOutput = config.createMessagesInputOutput();
+ partitionMap.put(vertexId, dataInputOutput);
+ }
+ return dataInputOutput;
+ }
+
+ @Override
+ public void addPartitionMessages(int partitionId,
+ VertexIdMessages<LongWritable, M> messages) throws IOException {
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId);
+ synchronized (partitionMap) {
+ VertexIdMessageBytesIterator<LongWritable, M>
+ vertexIdMessageBytesIterator =
+ messages.getVertexIdMessageBytesIterator();
+ // Try to copy the message buffer over rather than
+ // doing a deserialization of a message just to know its size. This
+ // should be more efficient for complex objects where serialization is
+ // expensive. If this type of iterator is not available, fall back to
+ // deserializing/serializing the messages
+ if (vertexIdMessageBytesIterator != null) {
+ while (vertexIdMessageBytesIterator.hasNext()) {
+ vertexIdMessageBytesIterator.next();
+ DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+ vertexIdMessageBytesIterator.getCurrentVertexId().get());
+ vertexIdMessageBytesIterator.writeCurrentMessageBytes(
+ dataInputOutput.getDataOutput());
+ }
+ } else {
+ VertexIdMessageIterator<LongWritable, M>
+ iterator = messages.getVertexIdMessageIterator();
+ while (iterator.hasNext()) {
+ iterator.next();
+ DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+ iterator.getCurrentVertexId().get());
+ VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+ dataInputOutput.getDataOutput());
+ }
+ }
+ }
+ }
+
+ @Override
+ public void finalizeStore() {
+ }
+
+ @Override
+ public Iterable<M> getVertexMessages(
+ LongWritable vertexId) throws IOException {
+ DataInputOutput dataInputOutput =
+ getPartitionMap(vertexId).get(vertexId.get());
+ if (dataInputOutput == null) {
+ return EmptyIterable.get();
+ } else {
+ return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
+ }
+ }
+
+ @Override
+ public void writePartition(DataOutput out, int partitionId)
+ throws IOException {
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
+ map.get(partitionId);
+ out.writeInt(partitionMap.size());
+ ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
+ partitionMap.long2ObjectEntrySet().fastIterator();
+ while (iterator.hasNext()) {
+ Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
+ out.writeLong(entry.getLongKey());
+ entry.getValue().write(out);
+ }
+ }
+
+ @Override
+ public void readFieldsForPartition(DataInput in,
+ int partitionId) throws IOException {
+ int size = in.readInt();
+ Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
+ new Long2ObjectOpenHashMap<DataInputOutput>(size);
+ while (size-- > 0) {
+ long vertexId = in.readLong();
+ DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+ dataInputOutput.readFields(in);
+ partitionMap.put(vertexId, dataInputOutput);
+ }
+ synchronized (map) {
+ map.put(partitionId, partitionMap);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
new file mode 100644
index 0000000..32296ad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PointerListMessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
+
+/**
+ * This stores messages in
+ * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
+ * and stores long pointers that point to serialized messages
+ *
+ * @param <M> message type
+ */
+public class LongPointerListMessageStore<M extends Writable>
+ extends LongAbstractListMessageStore<M, LongArrayList>
+ implements MessageStore<LongWritable, M> {
+
+ /** Buffers of byte array outputs used to store messages - thread safe */
+ private final ExtendedByteArrayOutputBuffer bytesBuffer;
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Factory for creating message values
+ * @param service Service worker
+ * @param config Hadoop configuration
+ */
+ public LongPointerListMessageStore(
+ MessageValueFactory<M> messageValueFactory,
+ CentralizedServiceWorker<LongWritable, Writable, Writable> service,
+ ImmutableClassesGiraphConfiguration<LongWritable,
+ Writable, Writable> config) {
+ super(messageValueFactory, service, config);
+ bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
+ }
+
+ @Override
+ public boolean isPointerListEncoding() {
+ return true;
+ }
+
+ @Override
+ protected LongArrayList createList() {
+ return new LongArrayList();
+ }
+
+ @Override
+ public void addPartitionMessages(int partitionId,
+ VertexIdMessages<LongWritable, M> messages) throws IOException {
+ VertexIdMessageIterator<LongWritable, M> iterator =
+ messages.getVertexIdMessageIterator();
+ long pointer = 0;
+ LongArrayList list;
+ while (iterator.hasNext()) {
+ iterator.next();
+ M msg = iterator.getCurrentMessage();
+ list = getList(iterator);
+ if (iterator.isNewMessage()) {
+ IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+ pointer = indexAndDataOut.getIndex();
+ pointer <<= 32;
+ ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+ pointer += dataOutput.getPos();
+ msg.write(dataOutput);
+ }
+ synchronized (list) { // TODO - any better way?
+ list.add(pointer);
+ }
+ }
+ }
+
+ @Override
+ public Iterable<M> getVertexMessages(
+ LongWritable vertexId) throws IOException {
+ LongArrayList list = getPartitionMap(vertexId).get(
+ vertexId.get());
+ if (list == null) {
+ return EmptyIterable.get();
+ } else {
+ return new PointerListMessagesIterable<>(messageValueFactory,
+ list, bytesBuffer);
+ }
+ }
+
+ // FIXME -- complete these for check-pointing
+ @Override
+ public void writePartition(DataOutput out, int partitionId)
+ throws IOException {
+ }
+
+ @Override
+ public void readFieldsForPartition(DataInput in, int partitionId)
+ throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java
new file mode 100644
index 0000000..121d1db
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Message store based off of primitives when I = LongWritable
+ */
+package org.apache.giraph.comm.messages.primitives.long_id;
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index ef3f824..f762f46 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -22,9 +22,9 @@ import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.util.PercentGauge;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.SendOneMessageToManyCache;
import org.apache.giraph.comm.SendEdgeCache;
import org.apache.giraph.comm.SendMessageCache;
-import org.apache.giraph.comm.SendMessageToAllCache;
import org.apache.giraph.comm.SendMutationsCache;
import org.apache.giraph.comm.SendPartitionCache;
import org.apache.giraph.comm.ServerData;
@@ -134,9 +134,9 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
maxVerticesSizePerWorker =
GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
- if (this.configuration.isOneToAllMsgSendingEnabled()) {
+ if (this.configuration.useOneMessageToManyIdsEncoding()) {
sendMessageCache =
- new SendMessageToAllCache<I, Writable>(conf, serviceWorker,
+ new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
this, maxMessagesSizePerWorker);
} else {
sendMessageCache =
@@ -395,7 +395,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
@Override
public void flush() throws IOException {
// Execute the remaining sends messages (if any)
- // including one-to-one and one-to-all messages.
+ // including individual and compact messages.
sendMessageCache.flush();
// Execute the remaining sends vertices (if any)
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 22ecc0e..81c892d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -122,7 +122,7 @@ public class NettyWorkerServer<I extends WritableComparable,
@Override
public void prepareSuperstep() {
- serverData.prepareSuperstep();
+ serverData.prepareSuperstep(); // updates the current message-store
resolveMutations();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 408295c..c7561ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -38,9 +38,9 @@ public enum RequestType {
SEND_WORKER_VERTICES_REQUEST(SendWorkerVerticesRequest.class),
/** Sending a partition of messages for next superstep */
SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class),
- /** Sending one-to-all messages to a worker for next superstep */
- SEND_WORKER_ONETOALL_MESSAGES_REQUEST(
- SendWorkerOneToAllMessagesRequest.class),
+ /** Sending one message to many ids in a single request */
+ SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST(
+ SendWorkerOneMessageToManyRequest.class),
/**
* Sending a partition of messages for current superstep
* (used during partition exchange)
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
new file mode 100644
index 0000000..798ddfa
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Send a collection of ByteArrayOneMessageToManyIds messages to a worker.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+@SuppressWarnings("unchecked")
+public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
+ M extends Writable> extends WritableRequest<I, Writable, Writable>
+ implements WorkerRequest<I, Writable, Writable> {
+ /** ByteArrayOneMessageToManyIds encoding of vertexId & messages */
+ protected ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds;
+
+ /**
+ * Constructor used for reflection only.
+ */
+ public SendWorkerOneMessageToManyRequest() { }
+
+ /**
+ * Constructor used to send request.
+ *
+ * @param oneMessageToManyIds ByteArrayOneMessageToManyIds
+ * @param conf ImmutableClassesGiraphConfiguration
+ */
+ public SendWorkerOneMessageToManyRequest(
+ ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds,
+ ImmutableClassesGiraphConfiguration conf) {
+ this.oneMessageToManyIds = oneMessageToManyIds;
+ setConf(conf);
+ }
+
+ @Override
+ public RequestType getType() {
+ return RequestType.SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST;
+ }
+
+ @Override
+ public void readFieldsRequest(DataInput input) throws IOException {
+ oneMessageToManyIds = new ByteArrayOneMessageToManyIds<I, M>(
+ getConf().<M>getOutgoingMessageValueFactory());
+ oneMessageToManyIds.setConf(getConf());
+ oneMessageToManyIds.readFields(input);
+ }
+
+ @Override
+ public void writeRequest(DataOutput output) throws IOException {
+ this.oneMessageToManyIds.write(output);
+ }
+
+ @Override
+ public int getSerializedSize() {
+ return super.getSerializedSize() +
+ this.oneMessageToManyIds.getSerializedSize();
+ }
+
+ @Override
+ public void doRequest(ServerData serverData) {
+ try {
+ MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
+ if (messageStore.isPointerListEncoding()) {
+ // if message store is pointer list based then send data as is
+ messageStore.addPartitionMessages(-1, oneMessageToManyIds);
+ } else { // else split the data per partition and send individually
+ CentralizedServiceWorker<I, ?, ?> serviceWorker =
+ serverData.getServiceWorker();
+ // Get the initial size of ByteArrayVertexIdMessages per partition
+ // on this worker. To make sure every ByteArrayVertexIdMessages to have
+ // enough space to store the messages, we divide the original
+ // ByteArrayOneMessageToManyIds message size by the number of partitions
+ // and double the size
+ // (Assume the major component in ByteArrayOneMessageToManyIds message
+ // is a id list. Now each target id has a copy of message,
+ // therefore we double the buffer size)
+ // to get the initial size of ByteArrayVertexIdMessages.
+ int initialSize = oneMessageToManyIds.getSize() /
+ serverData.getPartitionStore().getNumPartitions() * 2;
+ // Create ByteArrayVertexIdMessages for
+ // message reformatting.
+ Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs =
+ new Int2ObjectOpenHashMap<>();
+
+ // Put data from ByteArrayOneMessageToManyIds
+ // to ByteArrayVertexIdMessages
+ VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+ oneMessageToManyIds.getVertexIdMessageIterator();
+ while (vertexIdMessageIterator.hasNext()) {
+ vertexIdMessageIterator.next();
+ M msg = vertexIdMessageIterator.getCurrentMessage();
+ I vertexId = vertexIdMessageIterator.getCurrentVertexId();
+ PartitionOwner owner =
+ serviceWorker.getVertexPartitionOwner(vertexId);
+ int partitionId = owner.getPartitionId();
+ ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs
+ .get(partitionId);
+ if (idMsgs == null) {
+ idMsgs = new ByteArrayVertexIdMessages<>(
+ getConf().<M>getOutgoingMessageValueFactory());
+ idMsgs.setConf(getConf());
+ idMsgs.initialize(initialSize);
+ partitionIdMsgs.put(partitionId, idMsgs);
+ }
+ idMsgs.add(vertexId, msg);
+ }
+
+ // Read ByteArrayVertexIdMessages and write to message store
+ for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
+ partitionIdMsgs.entrySet()) {
+ if (!idMsgs.getValue().isEmpty()) {
+ serverData.getIncomingMessageStore().addPartitionMessages(
+ idMsgs.getKey(), idMsgs.getValue());
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException("doRequest: Got IOException ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
deleted file mode 100644
index 5f1ed53..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map.Entry;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayOneToAllMessages;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataInput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Send a collection of one-to-all messages to a worker.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-@SuppressWarnings("unchecked")
-public class SendWorkerOneToAllMessagesRequest<I extends WritableComparable,
- M extends Writable> extends WritableRequest<I, Writable, Writable>
- implements WorkerRequest<I, Writable, Writable> {
- /** The byte array of one-to-all messages */
- private ByteArrayOneToAllMessages<I, M> oneToAllMsgs;
-
- /**
- * Constructor used for reflection only.
- */
- public SendWorkerOneToAllMessagesRequest() { }
-
- /**
- * Constructor used to send request.
- *
- * @param oneToAllMsgs A byte array of all one-to-all messages
- * @param conf ImmutableClassesGiraphConfiguration
- */
- public SendWorkerOneToAllMessagesRequest(
- ByteArrayOneToAllMessages<I, M> oneToAllMsgs,
- ImmutableClassesGiraphConfiguration conf) {
- this.oneToAllMsgs = oneToAllMsgs;
- setConf(conf);
- }
-
- @Override
- public RequestType getType() {
- return RequestType.SEND_WORKER_ONETOALL_MESSAGES_REQUEST;
- }
-
- @Override
- public void readFieldsRequest(DataInput input) throws IOException {
- oneToAllMsgs = new ByteArrayOneToAllMessages<I, M>(
- getConf().<M>getOutgoingMessageValueFactory());
- oneToAllMsgs.setConf(getConf());
- oneToAllMsgs.readFields(input);
- }
-
- @Override
- public void writeRequest(DataOutput output) throws IOException {
- this.oneToAllMsgs.write(output);
- }
-
- @Override
- public int getSerializedSize() {
- return super.getSerializedSize() + this.oneToAllMsgs.getSerializedSize();
- }
-
- @Override
- public void doRequest(ServerData serverData) {
- CentralizedServiceWorker<I, ?, ?> serviceWorker =
- serverData.getServiceWorker();
- // Get the initial size of ByteArrayVertexIdMessages per partition
- // on this worker. To make sure every ByteArrayVertexIdMessages to have
- // enough space to store the messages, we divide the original one-to-all
- // message size by the number of partitions and double the size
- // (Assume the major component in one-to-all message is a id list.
- // Now each target id has a copy of message,
- // therefore we double the buffer size)
- // to get the initial size of ByteArrayVertexIdMessages.
- int initialSize = oneToAllMsgs.getSize() /
- serverData.getPartitionStore().getNumPartitions() * 2;
- // Create ByteArrayVertexIdMessages for
- // message reformatting.
- Int2ObjectOpenHashMap<ByteArrayVertexIdMessages>
- partitionIdMsgs =
- new Int2ObjectOpenHashMap<ByteArrayVertexIdMessages>();
-
- // Put data from ByteArrayOneToAllMessages to ByteArrayVertexIdMessages
- ExtendedDataInput reader = oneToAllMsgs.getOneToAllMessagesReader();
- I vertexId = getConf().createVertexId();
- M msg = oneToAllMsgs.createMessage();
- int idCount = 0;
- int partitionId = 0;
- try {
- while (!reader.endOfInput()) {
- msg.readFields(reader);
- idCount = reader.readInt();
- for (int i = 0; i < idCount; i++) {
- vertexId.readFields(reader);
- PartitionOwner owner =
- serviceWorker.getVertexPartitionOwner(vertexId);
- partitionId = owner.getPartitionId();
- ByteArrayVertexIdMessages<I, M> idMsgs =
- partitionIdMsgs.get(partitionId);
- if (idMsgs == null) {
- idMsgs = new ByteArrayVertexIdMessages<I, M>(
- getConf().<M>getOutgoingMessageValueFactory());
- idMsgs.setConf(getConf());
- idMsgs.initialize(initialSize);
- partitionIdMsgs.put(partitionId, idMsgs);
- }
- idMsgs.add(vertexId, msg);
- }
- }
- } catch (IOException e) {
- throw new RuntimeException("doRequest: Got IOException ", e);
- }
- // Read ByteArrayVertexIdMessages and write to message store
- try {
- for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
- partitionIdMsgs.entrySet()) {
- if (!idMsgs.getValue().isEmpty()) {
- serverData.getIncomingMessageStore().addPartitionMessages(
- idMsgs.getKey(), idMsgs.getValue());
- }
- }
- } catch (IOException e) {
- throw new RuntimeException("doRequest: Got IOException.", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ee88b04..953f49f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -1176,23 +1176,18 @@ public class GiraphConfiguration extends Configuration
}
/**
- * Enable communication optimization for one-to-all messages.
- */
- public void enableOneToAllMsgSending() {
- ONE_TO_ALL_MSG_SENDING.set(this, true);
- }
-
- /**
- * Return if one-to-all messsage sending is enabled.
+ * Return if oneMessageToManyIds encoding can be enabled
*
- * @return True if this option is enabled.
+ * @return True if this option is true.
*/
- public boolean isOneToAllMsgSendingEnabled() {
- return ONE_TO_ALL_MSG_SENDING.isTrue(this);
+ public boolean useOneMessageToManyIdsEncoding() {
+ return MESSAGE_ENCODE_AND_STORE_TYPE.get(this)
+ .useOneMessageToManyIdsEncoding();
}
/**
* Get option whether to create a source vertex present only in edge input
+ *
* @return CREATE_EDGE_SOURCE_VERTICES option
*/
public boolean getCreateSourceVertex() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 1879a25..ab0570f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -21,6 +21,7 @@ import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
import org.apache.giraph.comm.messages.MessageStoreFactory;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.EdgeStoreFactory;
@@ -556,12 +557,6 @@ public interface GiraphConstants {
new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB,
"How big to make the encoder buffer?");
- /** Whether or not netty request encoder should use direct byte buffers */
- BooleanConfOption NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
- new BooleanConfOption("giraph.nettyRequestEncoderUseDirectBuffers",
- false, "Whether or not netty request encoder " +
- "should use direct byte buffers");
-
/** Netty client threads */
IntConfOption NETTY_CLIENT_THREADS =
new IntConfOption("giraph.nettyClientThreads", 4, "Netty client threads");
@@ -1054,13 +1049,14 @@ public interface GiraphConstants {
"edges every time.");
/**
- * This option will enable communication optimization for one-to-all
- * message sending. For multiple target ids on the same machine,
- * we only send one message to all the targets.
+ * This option will tell which message encode & store enum to use when
+ * combining is not enabled
*/
- BooleanConfOption ONE_TO_ALL_MSG_SENDING =
- new BooleanConfOption("giraph.oneToAllMsgSending", false, "Enable " +
- "one-to-all message sending strategy");
+ EnumConfOption<MessageEncodeAndStoreType> MESSAGE_ENCODE_AND_STORE_TYPE =
+ EnumConfOption.create("giraph.messageEncodeAndStoreType",
+ MessageEncodeAndStoreType.class,
+ MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
+ "Select the message_encode_and_store_type to use");
/**
* This option can be used to specify if a source vertex present in edge
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java
new file mode 100644
index 0000000..674b0b0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Stores a message and a list of target vertex ids.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+@SuppressWarnings("unchecked")
+public class ByteArrayOneMessageToManyIds<I extends WritableComparable,
+ M extends Writable> extends ByteArrayVertexIdData<I, M>
+ implements VertexIdMessages<I, M> {
+ /** Message value class */
+ private MessageValueFactory<M> messageValueFactory;
+
+ /**
+ * Constructor.
+ *
+ * @param messageValueFactory Class for messages
+ */
+ public ByteArrayOneMessageToManyIds(
+ MessageValueFactory<M> messageValueFactory) {
+ this.messageValueFactory = messageValueFactory;
+ }
+
+ @Override
+ public M createData() {
+ return messageValueFactory.newInstance();
+ }
+
+ @Override
+ public void writeData(ExtendedDataOutput out, M message) throws IOException {
+ message.write(out);
+ }
+
+ @Override
+ public void readData(ExtendedDataInput in, M message) throws IOException {
+ message.readFields(in);
+ }
+
+ /**
+ * Add a message.
+ * The order is: the message>id count>ids .
+ *
+ * @param ids The byte array which holds target ids
+ * of this message on the worker
+ * @param idPos The end position of the ids
+ * information in the byte array above.
+ * @param count The number of ids
+ * @param msg The message sent
+ */
+ public void add(byte[] ids, int idPos, int count, M msg) {
+ try {
+ msg.write(extendedDataOutput);
+ extendedDataOutput.writeInt(count);
+ extendedDataOutput.write(ids, 0, idPos);
+ } catch (IOException e) {
+ throw new IllegalStateException("add: IOException", e);
+ }
+ }
+
+ @Override
+ public void add(I vertexId, M data) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void add(byte[] serializedId, int idPos, M data) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public VertexIdMessageBytesIterator<I, M> getVertexIdMessageBytesIterator() {
+ return null;
+ }
+
+ @Override
+ public VertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
+ return new OneMessageToManyIdsIterator<>(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java
deleted file mode 100644
index f190c17..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Stores a message and a list of target vertex ids.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-@SuppressWarnings("unchecked")
-public class ByteArrayOneToAllMessages<
- I extends WritableComparable, M extends Writable>
- implements Writable, ImmutableClassesGiraphConfigurable {
- /** Extended data output */
- private ExtendedDataOutput extendedDataOutput;
- /** Configuration */
- private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
- /** Message value class */
- private MessageValueFactory<M> messageValueFactory;
-
- /**
- * Constructor.
- *
- * @param messageValueFactory Class for messages
- */
- public ByteArrayOneToAllMessages(
- MessageValueFactory<M> messageValueFactory) {
- this.messageValueFactory = messageValueFactory;
- }
-
- /**
- * Initialize the inner state. Must be called before {@code add()} is called.
- */
- public void initialize() {
- extendedDataOutput = configuration.createExtendedDataOutput();
- }
-
- /**
- * Initialize the inner state, with a known size. Must be called before
- * {@code add()} is called.
- *
- * @param expectedSize Number of bytes to be expected
- */
- public void initialize(int expectedSize) {
- extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration configuration) {
- this.configuration = configuration;
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return this.configuration;
- }
-
- /**
- * Add a message.
- * The order is: the message>id count>ids .
- *
- * @param ids The byte array which holds target ids
- * of this message on the worker
- * @param idPos The end position of the ids
- * information in the byte array above.
- * @param count The number of ids
- * @param msg The message sent
- */
- public void add(byte[] ids, int idPos, int count, M msg) {
- try {
- msg.write(extendedDataOutput);
- extendedDataOutput.writeInt(count);
- extendedDataOutput.write(ids, 0, idPos);
- } catch (IOException e) {
- throw new IllegalStateException("add: IOException", e);
- }
- }
-
- /**
- * Create a message.
- *
- * @return A created message object.
- */
- public M createMessage() {
- return messageValueFactory.newInstance();
- }
-
- /**
- * Get the number of bytes used.
- *
- * @return Bytes used
- */
- public int getSize() {
- return extendedDataOutput.getPos();
- }
-
- /**
- * Get the size of ByteArrayOneToAllMessages after serialization.
- * Here 4 is the size of an integer which represents the size of whole
- * byte array.
- *
- * @return The size (in bytes) of the serialized object
- */
- public int getSerializedSize() {
- return 4 + getSize();
- }
-
- @Override
- public void write(DataOutput dataOutput) throws IOException {
- dataOutput.writeInt(extendedDataOutput.getPos());
- dataOutput.write(extendedDataOutput.getByteArray(), 0,
- extendedDataOutput.getPos());
- }
-
- @Override
- public void readFields(DataInput dataInput) throws IOException {
- int size = dataInput.readInt();
- byte[] buf = new byte[size];
- dataInput.readFully(buf);
- extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
- }
-
- /**
- * Check if the byte array is empty.
- *
- * @return True if the position of the byte array is 0.
- */
- public boolean isEmpty() {
- return extendedDataOutput.getPos() == 0;
- }
-
- /**
- * Get the reader of this OneToAllMessages
- *
- * @return ExtendedDataInput
- */
- public ExtendedDataInput getOneToAllMessagesReader() {
- return configuration.createExtendedDataInput(
- extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
index cefec0e..962bc75 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
@@ -39,6 +39,8 @@ public class ByteStructVertexIdDataIterator<I extends WritableComparable, T>
extends ByteStructVertexIdIterator<I> implements VertexIdDataIterator<I, T> {
/** VertexIdData to iterate over */
protected AbstractVertexIdData<I, T> vertexIdData;
+ /** Serialized size of the message object in bytestore */
+ protected int dataSize;
/** Current data. */
private T data;
@@ -63,13 +65,20 @@ public class ByteStructVertexIdDataIterator<I extends WritableComparable, T>
}
try {
vertexId.readFields(extendedDataInput);
+ int initial = extendedDataInput.getPos();
vertexIdData.readData(extendedDataInput, data);
+ dataSize = extendedDataInput.getPos() - initial;
} catch (IOException e) {
throw new IllegalStateException("next: IOException", e);
}
}
@Override
+ public int getCurrentDataSize() {
+ return dataSize;
+ }
+
+ @Override
public T getCurrentData() {
return data;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
index b686211..dd91ea2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
@@ -46,4 +46,14 @@ public class ByteStructVertexIdMessageIterator<I extends WritableComparable,
public M getCurrentMessage() {
return getCurrentData();
}
+
+ @Override
+ public int getCurrentMessageSize() {
+ return getCurrentDataSize();
+ }
+
+ @Override
+ public boolean isNewMessage() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java
new file mode 100644
index 0000000..80c3aee
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Wraps a list of byte array outputs and provides convenient
+ * utilities on top of it
+ */
+public class ExtendedByteArrayOutputBuffer {
+ /**
+ * This option sets the capacity of an
+ * {@link org.apache.giraph.utils.ExtendedDataOutput} instance created in
+ * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
+ */
+ public static final IntConfOption CAPACITY_OF_DATAOUT_IN_BUFFER =
+ new IntConfOption("giraph.capacityOfDataOutInBuffer",
+ 1024 * GiraphConstants.ONE_KB,
+ "Set the capacity of dataoutputs in dataout buffer");
+
+ /**
+ * This option sets the maximum fraction of a
+ * {@link org.apache.giraph.utils.ExtendedDataOutput} instance (stored in
+ * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer})
+ * that can be filled
+ */
+ public static final FloatConfOption FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER =
+ new FloatConfOption("giraph.fillingThresholdOfDataoutInBuffer", 0.98f,
+ "Set the maximum fraction of dataoutput capacity allowed to fill");
+
+ /** Maximum size allowed for one byte array output */
+ private final int maxBufSize;
+ /** Stop writing to buffer after threshold has been reached */
+ private final int threshold;
+ /** Giraph configuration */
+ private final ImmutableClassesGiraphConfiguration<?, ? , ?> config;
+
+ /** Map of index => byte array outputs */
+ private final Int2ObjectOpenHashMap<ExtendedDataOutput>
+ bytearrayOutputs = new Int2ObjectOpenHashMap<>();
+ /** Size of byte array outputs map */
+ private final AtomicInteger mapSize = new AtomicInteger(0);
+ /** Thread local variable to get hold of a byte array output stream */
+ private final ThreadLocal<IndexAndDataOut> threadLocal =
+ new ThreadLocal<IndexAndDataOut>() {
+ @Override
+ protected IndexAndDataOut initialValue() {
+ return newIndexAndDataOutput();
+ }
+ };
+
+ /**
+ * Constructor
+ *
+ * @param config configuration
+ */
+ public ExtendedByteArrayOutputBuffer(
+ ImmutableClassesGiraphConfiguration<?, ?, ?> config) {
+ this.config = config;
+
+ maxBufSize = CAPACITY_OF_DATAOUT_IN_BUFFER.get(config);
+ threshold = (int) (FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER.get(config) *
+ maxBufSize);
+ }
+
+ /**
+ * Return threadLocal indexAndDataOutput instance
+ *
+ * @return threadLocal indexAndDataOutput instance
+ */
+ public IndexAndDataOut getIndexAndDataOut() {
+ IndexAndDataOut indexAndDataOut = threadLocal.get();
+ if (indexAndDataOut.dataOutput.getPos() >= threshold) {
+ indexAndDataOut = newIndexAndDataOutput();
+ threadLocal.set(indexAndDataOut);
+ }
+ return indexAndDataOut;
+ }
+
+ /**
+ * Get dataoutput from bytearrayOutputs
+ *
+ * @param index index in bytearrayOutputs
+ * @return extendeddataoutput at given index
+ */
+ public ExtendedDataOutput getDataOutput(int index) {
+ return bytearrayOutputs.get(index);
+ }
+
+ /**
+ * Holder for index & DataOutput objects
+ */
+ public static class IndexAndDataOut {
+ /** Index */
+ private final int index;
+ /** Dataouput instance */
+ private final ExtendedDataOutput dataOutput;
+
+ /**
+ * Constructor
+ *
+ * @param index index in bytearrayOutputs
+ * @param dataOutput dataoutput
+ */
+ public IndexAndDataOut(int index, ExtendedDataOutput dataOutput) {
+ this.index = index;
+ this.dataOutput = dataOutput;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public ExtendedDataOutput getDataOutput() {
+ return dataOutput;
+ }
+ }
+
+ /**
+ * Create a new IndexAndDataOutput instance
+ * @return new IndexAndDataOutput instance
+ */
+ private IndexAndDataOut newIndexAndDataOutput() {
+ int index = mapSize.getAndIncrement();
+ ExtendedDataOutput output = config.createExtendedDataOutput(
+ maxBufSize);
+ synchronized (bytearrayOutputs) {
+ bytearrayOutputs.put(index, output);
+ }
+ return new IndexAndDataOut(index, output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
index bc979af..0da9681 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
@@ -82,4 +82,3 @@ public interface ExtendedDataOutput extends DataOutput {
*/
void reset();
}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java
new file mode 100644
index 0000000..f353b2d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * VertexIdData iterator for
+ * {@link ByteArrayOneMessageToManyIds}
+ *
+ * @param <I> vertexId type
+ * @param <M> message type
+ */
+public class OneMessageToManyIdsIterator<I extends WritableComparable,
+ M extends Writable> implements VertexIdMessageIterator<I, M> {
+ /** VertexIdMessages object to iterate over */
+ private final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages;
+ /** Reader of the serialized edges */
+ private final ExtendedDataInput extendedDataInput;
+
+ /** Current vertex Id*/
+ private I vertexId;
+ /** Current message */
+ private M msg;
+ /** Counts of ids left to read before next message */
+ private int idsToRead = 0;
+ /** Size of message read */
+ private int msgSize = 0;
+ /** Is current message newly read */
+ private boolean newMessage;
+
+ /**
+ * Constructor
+ *
+ * @param vertexIdMessages vertexId messages object to iterate over
+ */
+ public OneMessageToManyIdsIterator(
+ final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages) {
+ this.vertexIdMessages = vertexIdMessages;
+ this.extendedDataInput = vertexIdMessages.getConf()
+ .createExtendedDataInput(vertexIdMessages.extendedDataOutput);
+ }
+
+ @Override
+ public I getCurrentVertexId() {
+ return vertexId;
+ }
+
+ @Override
+ public M getCurrentMessage() {
+ return getCurrentData();
+ }
+
+ @Override
+ public M getCurrentData() {
+ return msg;
+ }
+
+ @Override
+ public M releaseCurrentData() {
+ M releasedData = msg;
+ msg = null;
+ return releasedData;
+ }
+
+ @Override
+ public I releaseCurrentVertexId() {
+ I releasedVertexId = vertexId;
+ vertexId = null;
+ return releasedVertexId;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return extendedDataInput.available() > 0;
+ }
+
+ /**
+ * Properly initialize vertexId & msg object before calling next()
+ */
+ private void initialize() {
+ if (vertexId == null) {
+ vertexId = vertexIdMessages.getConf().createVertexId();
+ }
+ if (msg == null) {
+ msg = vertexIdMessages.createData();
+ }
+ }
+
+ @Override
+ public void next() {
+ initialize();
+ try {
+ if (idsToRead == 0) {
+ newMessage = true; // a new message is read
+ int initial = extendedDataInput.getPos();
+ msg.readFields(extendedDataInput);
+ msgSize = extendedDataInput.getPos() - initial;
+ idsToRead = extendedDataInput.readInt();
+ } else {
+ newMessage = false; // same as previous message
+ }
+ vertexId.readFields(extendedDataInput);
+ idsToRead -= 1;
+ } catch (IOException e) {
+ throw new IllegalStateException("next: IOException", e);
+ }
+ }
+
+ @Override
+ public int getCurrentMessageSize() {
+ return getCurrentDataSize();
+ }
+
+ @Override
+ public int getCurrentDataSize() {
+ return msgSize;
+ }
+
+ @Override
+ public boolean isNewMessage() {
+ return newMessage;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
index 1ab8de6..c5587e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
@@ -55,7 +55,7 @@ public class UnsafeArrayReads extends UnsafeReads {
UNSAFE.arrayBaseOffset(byte[].class);
/** Byte buffer */
- private final byte[] buf;
+ protected byte[] buf;
/**
* Constructor
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
index 5f99846..39ab352 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
@@ -28,7 +28,7 @@ import java.io.UTFDataFormatException;
public abstract class UnsafeReads implements ExtendedDataInput {
/** Buffer length */
- protected final int bufLength;
+ protected int bufLength;
/** Position in the buffer */
protected long pos = 0;
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
new file mode 100644
index 0000000..a75815a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * UnsafeReusableByteArrayInput is a data structure to read from a
+ * byte buffer with a read pointer that can be moved to desired location
+ */
+public class UnsafeReusableByteArrayInput extends UnsafeArrayReads {
+
+ /**
+ * Default Constructor
+ */
+ public UnsafeReusableByteArrayInput() {
+ super(null, 0, 0);
+ }
+
+ /**
+ * Initialize the object with all required parameters
+ *
+ * @param buf byte buffer
+ * @param offset offset in the buffer
+ * @param length length of the valid data
+ */
+ public void initialize(byte[] buf, int offset, int length) {
+ this.buf = buf;
+ this.pos = offset;
+ this.bufLength = length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
index 6aea8ea..80792a5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
@@ -41,6 +41,13 @@ public interface VertexIdDataIterator<I extends WritableComparable, T>
T getCurrentData();
/**
+ * Get serialized size of current data
+ *
+ * @return serialized size of data
+ */
+ int getCurrentDataSize();
+
+ /**
* Release the current data object.
*
* @return Released data object
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
index c241cea..288f7ce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
@@ -36,4 +36,18 @@ public interface VertexIdMessageIterator<I extends WritableComparable,
* @return Current message
*/
M getCurrentMessage();
+
+ /**
+ * Get the serialized size of current message
+ *
+ * @return serialized size of current message
+ */
+ int getCurrentMessageSize();
+
+ /**
+ * Return true of current message is new
+ *
+ * @return true if current message is new
+ */
+ boolean isNewMessage();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 8037db9..b56bab3 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -24,7 +24,7 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
import org.apache.giraph.comm.requests.SendVertexRequest;
import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
-import org.apache.giraph.comm.requests.SendWorkerOneToAllMessagesRequest;
+import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -36,8 +36,8 @@ import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.ByteArrayOneToAllMessages;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.IntNoOpComputation;
@@ -196,10 +196,10 @@ public class RequestTest {
}
@Test
- public void sendWorkerOneToAllMessagesRequest() throws IOException {
+ public void sendWorkerIndividualMessagesRequest() throws IOException {
// Data to send
- ByteArrayOneToAllMessages<IntWritable, IntWritable>
- dataToSend = new ByteArrayOneToAllMessages<>(new
+ ByteArrayOneMessageToManyIds<IntWritable, IntWritable>
+ dataToSend = new ByteArrayOneMessageToManyIds<>(new
TestMessageValueFactory<>(IntWritable.class));
dataToSend.setConf(conf);
dataToSend.initialize();
@@ -211,8 +211,8 @@ public class RequestTest {
dataToSend.add(output.getByteArray(), output.getPos(), 7, new IntWritable(1));
// Send the request
- SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable> request =
- new SendWorkerOneToAllMessagesRequest<>(dataToSend, conf);
+ SendWorkerOneMessageToManyRequest<IntWritable, IntWritable> request =
+ new SendWorkerOneMessageToManyRequest<>(dataToSend, conf);
client.sendWritableRequest(workerInfo.getTaskId(), request);
client.waitAllRequests();
http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index d3c392e..5903eb8 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.DoubleSumMessageCombiner;
-import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;