You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by rv...@apache.org on 2014/10/26 02:21:55 UTC

[12/47] GIRAPH-912: Support succinct representation of messages in messagestores (pavanka)

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
new file mode 100644
index 0000000..092d963
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.VertexIdMessageBytesIterator;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.giraph.utils.VerboseByteStructMessageWrite;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.io.DataInputOutput;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> Message type
+ */
+public class LongByteArrayMessageStore<M extends Writable>
+  extends LongAbstractMessageStore<M, DataInputOutput> {
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param service             Service worker
+   * @param config              Hadoop configuration
+   */
+  public LongByteArrayMessageStore(
+      MessageValueFactory<M> messageValueFactory,
+      CentralizedServiceWorker<LongWritable, Writable, Writable> service,
+      ImmutableClassesGiraphConfiguration<LongWritable,
+          Writable, Writable> config) {
+    super(messageValueFactory, service, config);
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return false;
+  }
+
+  /**
+   * Get the DataInputOutput for a vertex id, creating if necessary.
+   *
+   * @param partitionMap Partition map to look in
+   * @param vertexId Id of the vertex
+   * @return DataInputOutput for this vertex id (created if necessary)
+   */
+  private DataInputOutput getDataInputOutput(
+    Long2ObjectOpenHashMap<DataInputOutput> partitionMap, long vertexId) {
+    DataInputOutput dataInputOutput = partitionMap.get(vertexId);
+    if (dataInputOutput == null) {
+      dataInputOutput = config.createMessagesInputOutput();
+      partitionMap.put(vertexId, dataInputOutput);
+    }
+    return dataInputOutput;
+  }
+
+  @Override
+  public void addPartitionMessages(int partitionId,
+    VertexIdMessages<LongWritable, M> messages) throws IOException {
+    Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId);
+    synchronized (partitionMap) {
+      VertexIdMessageBytesIterator<LongWritable, M>
+          vertexIdMessageBytesIterator =
+          messages.getVertexIdMessageBytesIterator();
+      // Try to copy the message buffer over rather than
+      // doing a deserialization of a message just to know its size.  This
+      // should be more efficient for complex objects where serialization is
+      // expensive.  If this type of iterator is not available, fall back to
+      // deserializing/serializing the messages
+      if (vertexIdMessageBytesIterator != null) {
+        while (vertexIdMessageBytesIterator.hasNext()) {
+          vertexIdMessageBytesIterator.next();
+          DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
+              vertexIdMessageBytesIterator.getCurrentVertexId().get());
+          vertexIdMessageBytesIterator.writeCurrentMessageBytes(
+              dataInputOutput.getDataOutput());
+        }
+      } else {
+        VertexIdMessageIterator<LongWritable, M>
+            iterator = messages.getVertexIdMessageIterator();
+        while (iterator.hasNext()) {
+          iterator.next();
+          DataInputOutput dataInputOutput =  getDataInputOutput(partitionMap,
+              iterator.getCurrentVertexId().get());
+          VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
+              dataInputOutput.getDataOutput());
+        }
+      }
+    }
+  }
+
+  @Override
+  public void finalizeStore() {
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(
+    LongWritable vertexId) throws IOException {
+    DataInputOutput dataInputOutput =
+        getPartitionMap(vertexId).get(vertexId.get());
+    if (dataInputOutput == null) {
+      return EmptyIterable.get();
+    } else {
+      return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
+    }
+  }
+
+  @Override
+  public void writePartition(DataOutput out, int partitionId)
+    throws IOException {
+    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
+        map.get(partitionId);
+    out.writeInt(partitionMap.size());
+    ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
+        partitionMap.long2ObjectEntrySet().fastIterator();
+    while (iterator.hasNext()) {
+      Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
+      out.writeLong(entry.getLongKey());
+      entry.getValue().write(out);
+    }
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in,
+    int partitionId) throws IOException {
+    int size = in.readInt();
+    Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
+        new Long2ObjectOpenHashMap<DataInputOutput>(size);
+    while (size-- > 0) {
+      long vertexId = in.readLong();
+      DataInputOutput dataInputOutput = config.createMessagesInputOutput();
+      dataInputOutput.readFields(in);
+      partitionMap.put(vertexId, dataInputOutput);
+    }
+    synchronized (map) {
+      map.put(partitionId, partitionMap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
new file mode 100644
index 0000000..32296ad
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PointerListMessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
+
+/**
+ * This stores messages in
+ * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
+ * and stores long pointers that point to serialized messages
+ *
+ * @param <M> message type
+ */
+public class LongPointerListMessageStore<M extends Writable>
+  extends LongAbstractListMessageStore<M, LongArrayList>
+  implements MessageStore<LongWritable, M> {
+
+  /** Buffers of byte array outputs used to store messages - thread safe */
+  private final ExtendedByteArrayOutputBuffer bytesBuffer;
+
+  /**
+   * Constructor
+   *
+   * @param messageValueFactory Factory for creating message values
+   * @param service             Service worker
+   * @param config              Hadoop configuration
+   */
+  public LongPointerListMessageStore(
+    MessageValueFactory<M> messageValueFactory,
+    CentralizedServiceWorker<LongWritable, Writable, Writable> service,
+    ImmutableClassesGiraphConfiguration<LongWritable,
+    Writable, Writable> config) {
+    super(messageValueFactory, service, config);
+    bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
+  }
+
+  @Override
+  public boolean isPointerListEncoding() {
+    return true;
+  }
+
+  @Override
+  protected LongArrayList createList() {
+    return new LongArrayList();
+  }
+
+  @Override
+  public void addPartitionMessages(int partitionId,
+    VertexIdMessages<LongWritable, M> messages) throws IOException {
+    VertexIdMessageIterator<LongWritable, M> iterator =
+        messages.getVertexIdMessageIterator();
+    long pointer = 0;
+    LongArrayList list;
+    while (iterator.hasNext()) {
+      iterator.next();
+      M msg = iterator.getCurrentMessage();
+      list = getList(iterator);
+      if (iterator.isNewMessage()) {
+        IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+        pointer = indexAndDataOut.getIndex();
+        pointer <<= 32;
+        ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+        pointer += dataOutput.getPos();
+        msg.write(dataOutput);
+      }
+      synchronized (list) { // TODO - any better way?
+        list.add(pointer);
+      }
+    }
+  }
+
+  @Override
+  public Iterable<M> getVertexMessages(
+    LongWritable vertexId) throws IOException {
+    LongArrayList list = getPartitionMap(vertexId).get(
+        vertexId.get());
+    if (list == null) {
+      return EmptyIterable.get();
+    } else {
+      return new PointerListMessagesIterable<>(messageValueFactory,
+        list, bytesBuffer);
+    }
+  }
+
+  // FIXME -- complete these for check-pointing
+  @Override
+  public void writePartition(DataOutput out, int partitionId)
+    throws IOException {
+  }
+
+  @Override
+  public void readFieldsForPartition(DataInput in, int partitionId)
+    throws IOException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java
new file mode 100644
index 0000000..121d1db
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Message store based off of primitives when I = LongWritable
+ */
+package org.apache.giraph.comm.messages.primitives.long_id;

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
index ef3f824..f762f46 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerClientRequestProcessor.java
@@ -22,9 +22,9 @@ import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.util.PercentGauge;
 import org.apache.giraph.bsp.BspService;
 import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.SendOneMessageToManyCache;
 import org.apache.giraph.comm.SendEdgeCache;
 import org.apache.giraph.comm.SendMessageCache;
-import org.apache.giraph.comm.SendMessageToAllCache;
 import org.apache.giraph.comm.SendMutationsCache;
 import org.apache.giraph.comm.SendPartitionCache;
 import org.apache.giraph.comm.ServerData;
@@ -134,9 +134,9 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
         GiraphConfiguration.MAX_MSG_REQUEST_SIZE.get(conf);
     maxVerticesSizePerWorker =
         GiraphConfiguration.MAX_VERTEX_REQUEST_SIZE.get(conf);
-    if (this.configuration.isOneToAllMsgSendingEnabled()) {
+    if (this.configuration.useOneMessageToManyIdsEncoding()) {
       sendMessageCache =
-        new SendMessageToAllCache<I, Writable>(conf, serviceWorker,
+        new SendOneMessageToManyCache<I, Writable>(conf, serviceWorker,
           this, maxMessagesSizePerWorker);
     } else {
       sendMessageCache =
@@ -395,7 +395,7 @@ public class NettyWorkerClientRequestProcessor<I extends WritableComparable,
   @Override
   public void flush() throws IOException {
     // Execute the remaining sends messages (if any)
-    // including one-to-one and one-to-all messages.
+    // including individual and compact messages.
     sendMessageCache.flush();
 
     // Execute the remaining sends vertices (if any)

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index 22ecc0e..81c892d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -122,7 +122,7 @@ public class NettyWorkerServer<I extends WritableComparable,
 
   @Override
   public void prepareSuperstep() {
-    serverData.prepareSuperstep();
+    serverData.prepareSuperstep(); // updates the current message-store
     resolveMutations();
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 408295c..c7561ee 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -38,9 +38,9 @@ public enum RequestType {
   SEND_WORKER_VERTICES_REQUEST(SendWorkerVerticesRequest.class),
   /** Sending a partition of messages for next superstep */
   SEND_WORKER_MESSAGES_REQUEST(SendWorkerMessagesRequest.class),
-  /** Sending one-to-all messages to a worker for next superstep */
-  SEND_WORKER_ONETOALL_MESSAGES_REQUEST(
-    SendWorkerOneToAllMessagesRequest.class),
+  /** Sending one message to many ids in a single request */
+  SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST(
+      SendWorkerOneMessageToManyRequest.class),
   /**
    * Sending a partition of messages for current superstep
    * (used during partition exchange)

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
new file mode 100644
index 0000000..798ddfa
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneMessageToManyRequest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
+import org.apache.giraph.utils.ByteArrayVertexIdMessages;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Send a collection of ByteArrayOneMessageToManyIds messages to a worker.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+@SuppressWarnings("unchecked")
+public class SendWorkerOneMessageToManyRequest<I extends WritableComparable,
+    M extends Writable> extends WritableRequest<I, Writable, Writable>
+    implements WorkerRequest<I, Writable, Writable> {
+  /** ByteArrayOneMessageToManyIds encoding of vertexId & messages */
+  protected ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds;
+
+  /**
+   * Constructor used for reflection only.
+   */
+  public SendWorkerOneMessageToManyRequest() { }
+
+  /**
+   * Constructor used to send request.
+   *
+   * @param oneMessageToManyIds ByteArrayOneMessageToManyIds
+   * @param conf ImmutableClassesGiraphConfiguration
+   */
+  public SendWorkerOneMessageToManyRequest(
+      ByteArrayOneMessageToManyIds<I, M> oneMessageToManyIds,
+      ImmutableClassesGiraphConfiguration conf) {
+    this.oneMessageToManyIds = oneMessageToManyIds;
+    setConf(conf);
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.SEND_WORKER_ONE_MESSAGE_TO_MANY_REQUEST;
+  }
+
+  @Override
+  public void readFieldsRequest(DataInput input) throws IOException {
+    oneMessageToManyIds = new ByteArrayOneMessageToManyIds<I, M>(
+      getConf().<M>getOutgoingMessageValueFactory());
+    oneMessageToManyIds.setConf(getConf());
+    oneMessageToManyIds.readFields(input);
+  }
+
+  @Override
+  public void writeRequest(DataOutput output) throws IOException {
+    this.oneMessageToManyIds.write(output);
+  }
+
+  @Override
+  public int getSerializedSize() {
+    return super.getSerializedSize() +
+        this.oneMessageToManyIds.getSerializedSize();
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    try {
+      MessageStore<I, M> messageStore = serverData.getIncomingMessageStore();
+      if (messageStore.isPointerListEncoding()) {
+        // if message store is pointer list based then send data as is
+        messageStore.addPartitionMessages(-1, oneMessageToManyIds);
+      } else { // else split the data per partition and send individually
+        CentralizedServiceWorker<I, ?, ?> serviceWorker =
+            serverData.getServiceWorker();
+        // Get the initial size of ByteArrayVertexIdMessages per partition
+        // on this worker. To make sure every ByteArrayVertexIdMessages to have
+        // enough space to store the messages, we divide the original
+        // ByteArrayOneMessageToManyIds message size by the number of partitions
+        // and double the size
+        // (Assume the major component in ByteArrayOneMessageToManyIds message
+        // is a id list. Now each target id has a copy of message,
+        // therefore we double the buffer size)
+        // to get the initial size of ByteArrayVertexIdMessages.
+        int initialSize = oneMessageToManyIds.getSize() /
+            serverData.getPartitionStore().getNumPartitions() * 2;
+        // Create ByteArrayVertexIdMessages for
+        // message reformatting.
+        Int2ObjectOpenHashMap<ByteArrayVertexIdMessages> partitionIdMsgs =
+            new Int2ObjectOpenHashMap<>();
+
+        // Put data from ByteArrayOneMessageToManyIds
+        // to ByteArrayVertexIdMessages
+        VertexIdMessageIterator<I, M> vertexIdMessageIterator =
+          oneMessageToManyIds.getVertexIdMessageIterator();
+        while (vertexIdMessageIterator.hasNext()) {
+          vertexIdMessageIterator.next();
+          M msg = vertexIdMessageIterator.getCurrentMessage();
+          I vertexId = vertexIdMessageIterator.getCurrentVertexId();
+          PartitionOwner owner =
+              serviceWorker.getVertexPartitionOwner(vertexId);
+          int partitionId = owner.getPartitionId();
+          ByteArrayVertexIdMessages<I, M> idMsgs = partitionIdMsgs
+              .get(partitionId);
+          if (idMsgs == null) {
+            idMsgs = new ByteArrayVertexIdMessages<>(
+                getConf().<M>getOutgoingMessageValueFactory());
+            idMsgs.setConf(getConf());
+            idMsgs.initialize(initialSize);
+            partitionIdMsgs.put(partitionId, idMsgs);
+          }
+          idMsgs.add(vertexId, msg);
+        }
+
+        // Read ByteArrayVertexIdMessages and write to message store
+        for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
+            partitionIdMsgs.entrySet()) {
+          if (!idMsgs.getValue().isEmpty()) {
+            serverData.getIncomingMessageStore().addPartitionMessages(
+                idMsgs.getKey(), idMsgs.getValue());
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("doRequest: Got IOException ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
deleted file mode 100644
index 5f1ed53..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/SendWorkerOneToAllMessagesRequest.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.requests;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map.Entry;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.ServerData;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.ByteArrayOneToAllMessages;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataInput;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Send a collection of one-to-all messages to a worker.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-@SuppressWarnings("unchecked")
-public class SendWorkerOneToAllMessagesRequest<I extends WritableComparable,
-    M extends Writable> extends WritableRequest<I, Writable, Writable>
-    implements WorkerRequest<I, Writable, Writable> {
-  /** The byte array of one-to-all messages */
-  private ByteArrayOneToAllMessages<I, M> oneToAllMsgs;
-
-  /**
-   * Constructor used for reflection only.
-   */
-  public SendWorkerOneToAllMessagesRequest() { }
-
-  /**
-   * Constructor used to send request.
-   *
-   * @param oneToAllMsgs A byte array of all one-to-all messages
-   * @param conf ImmutableClassesGiraphConfiguration
-   */
-  public SendWorkerOneToAllMessagesRequest(
-      ByteArrayOneToAllMessages<I, M> oneToAllMsgs,
-      ImmutableClassesGiraphConfiguration conf) {
-    this.oneToAllMsgs = oneToAllMsgs;
-    setConf(conf);
-  }
-
-  @Override
-  public RequestType getType() {
-    return RequestType.SEND_WORKER_ONETOALL_MESSAGES_REQUEST;
-  }
-
-  @Override
-  public void readFieldsRequest(DataInput input) throws IOException {
-    oneToAllMsgs = new ByteArrayOneToAllMessages<I, M>(
-      getConf().<M>getOutgoingMessageValueFactory());
-    oneToAllMsgs.setConf(getConf());
-    oneToAllMsgs.readFields(input);
-  }
-
-  @Override
-  public void writeRequest(DataOutput output) throws IOException {
-    this.oneToAllMsgs.write(output);
-  }
-
-  @Override
-  public int getSerializedSize() {
-    return super.getSerializedSize() + this.oneToAllMsgs.getSerializedSize();
-  }
-
-  @Override
-  public void doRequest(ServerData serverData) {
-    CentralizedServiceWorker<I, ?, ?> serviceWorker =
-      serverData.getServiceWorker();
-    // Get the initial size of ByteArrayVertexIdMessages per partition
-    // on this worker. To make sure every ByteArrayVertexIdMessages to have
-    // enough space to store the messages, we divide the original one-to-all
-    // message size by the number of partitions and double the size
-    // (Assume the major component in one-to-all message is a id list.
-    // Now each target id has a copy of message,
-    // therefore we double the buffer size)
-    // to get the initial size of ByteArrayVertexIdMessages.
-    int initialSize = oneToAllMsgs.getSize() /
-      serverData.getPartitionStore().getNumPartitions() * 2;
-    // Create ByteArrayVertexIdMessages for
-    // message reformatting.
-    Int2ObjectOpenHashMap<ByteArrayVertexIdMessages>
-      partitionIdMsgs =
-        new Int2ObjectOpenHashMap<ByteArrayVertexIdMessages>();
-
-    // Put data from ByteArrayOneToAllMessages to ByteArrayVertexIdMessages
-    ExtendedDataInput reader = oneToAllMsgs.getOneToAllMessagesReader();
-    I vertexId = getConf().createVertexId();
-    M msg = oneToAllMsgs.createMessage();
-    int idCount = 0;
-    int partitionId = 0;
-    try {
-      while (!reader.endOfInput()) {
-        msg.readFields(reader);
-        idCount = reader.readInt();
-        for (int i = 0; i < idCount; i++) {
-          vertexId.readFields(reader);
-          PartitionOwner owner =
-            serviceWorker.getVertexPartitionOwner(vertexId);
-          partitionId = owner.getPartitionId();
-          ByteArrayVertexIdMessages<I, M> idMsgs =
-            partitionIdMsgs.get(partitionId);
-          if (idMsgs == null) {
-            idMsgs = new ByteArrayVertexIdMessages<I, M>(
-              getConf().<M>getOutgoingMessageValueFactory());
-            idMsgs.setConf(getConf());
-            idMsgs.initialize(initialSize);
-            partitionIdMsgs.put(partitionId, idMsgs);
-          }
-          idMsgs.add(vertexId, msg);
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("doRequest: Got IOException ", e);
-    }
-    // Read ByteArrayVertexIdMessages and write to message store
-    try {
-      for (Entry<Integer, ByteArrayVertexIdMessages> idMsgs :
-          partitionIdMsgs.entrySet()) {
-        if (!idMsgs.getValue().isEmpty()) {
-          serverData.getIncomingMessageStore().addPartitionMessages(
-            idMsgs.getKey(), idMsgs.getValue());
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException("doRequest: Got IOException.", e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index ee88b04..953f49f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -1176,23 +1176,18 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
-   * Enable communication optimization for one-to-all messages.
-   */
-  public void enableOneToAllMsgSending() {
-    ONE_TO_ALL_MSG_SENDING.set(this, true);
-  }
-
-  /**
-   * Return if one-to-all messsage sending is enabled.
+   * Return if oneMessageToManyIds encoding can be enabled
    *
-   * @return True if this option is enabled.
+   * @return True if this option is true.
    */
-  public boolean isOneToAllMsgSendingEnabled() {
-    return ONE_TO_ALL_MSG_SENDING.isTrue(this);
+  public boolean useOneMessageToManyIdsEncoding() {
+    return MESSAGE_ENCODE_AND_STORE_TYPE.get(this)
+      .useOneMessageToManyIdsEncoding();
   }
 
   /**
    * Get option whether to create a source vertex present only in edge input
+   *
    * @return CREATE_EDGE_SOURCE_VERTICES option
    */
   public boolean getCreateSourceVertex() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 1879a25..ab0570f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -21,6 +21,7 @@ import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.comm.messages.InMemoryMessageStoreFactory;
+import org.apache.giraph.comm.messages.MessageEncodeAndStoreType;
 import org.apache.giraph.comm.messages.MessageStoreFactory;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.EdgeStoreFactory;
@@ -556,12 +557,6 @@ public interface GiraphConstants {
       new IntConfOption("giraph.nettyRequestEncoderBufferSize", 32 * ONE_KB,
           "How big to make the encoder buffer?");
 
-  /** Whether or not netty request encoder should use direct byte buffers */
-  BooleanConfOption NETTY_REQUEST_ENCODER_USE_DIRECT_BUFFERS =
-      new BooleanConfOption("giraph.nettyRequestEncoderUseDirectBuffers",
-                            false, "Whether or not netty request encoder " +
-                                   "should use direct byte buffers");
-
   /** Netty client threads */
   IntConfOption NETTY_CLIENT_THREADS =
       new IntConfOption("giraph.nettyClientThreads", 4, "Netty client threads");
@@ -1054,13 +1049,14 @@ public interface GiraphConstants {
           "edges every time.");
 
   /**
-   * This option will enable communication optimization for one-to-all
-   * message sending. For multiple target ids on the same machine,
-   * we only send one message to all the targets.
+   * This option will tell which message encode & store enum to use when
+   * combining is not enabled
    */
-  BooleanConfOption ONE_TO_ALL_MSG_SENDING =
-    new BooleanConfOption("giraph.oneToAllMsgSending", false, "Enable " +
-        "one-to-all message sending strategy");
+  EnumConfOption<MessageEncodeAndStoreType> MESSAGE_ENCODE_AND_STORE_TYPE =
+      EnumConfOption.create("giraph.messageEncodeAndStoreType",
+          MessageEncodeAndStoreType.class,
+          MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION,
+          "Select the message_encode_and_store_type to use");
 
   /**
    * This option can be used to specify if a source vertex present in edge

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java
new file mode 100644
index 0000000..674b0b0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneMessageToManyIds.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Stores a message and a list of target vertex ids.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+@SuppressWarnings("unchecked")
+public class ByteArrayOneMessageToManyIds<I extends WritableComparable,
+  M extends Writable> extends ByteArrayVertexIdData<I, M>
+  implements VertexIdMessages<I, M> {
+  /** Message value class */
+  private MessageValueFactory<M> messageValueFactory;
+
+  /**
+   * Constructor.
+   *
+   * @param messageValueFactory Class for messages
+   */
+  public ByteArrayOneMessageToManyIds(
+      MessageValueFactory<M> messageValueFactory) {
+    this.messageValueFactory = messageValueFactory;
+  }
+
+  @Override
+  public M createData() {
+    return messageValueFactory.newInstance();
+  }
+
+  @Override
+  public void writeData(ExtendedDataOutput out, M message) throws IOException {
+    message.write(out);
+  }
+
+  @Override
+  public void readData(ExtendedDataInput in, M message) throws IOException {
+    message.readFields(in);
+  }
+
+  /**
+   * Add a message.
+   * The order is: the message>id count>ids .
+   *
+   * @param ids   The byte array which holds target ids
+   *              of this message on the worker
+   * @param idPos The end position of the ids
+   *              information in the byte array above.
+   * @param count The number of ids
+   * @param msg   The message sent
+   */
+  public void add(byte[] ids, int idPos, int count, M msg) {
+    try {
+      msg.write(extendedDataOutput);
+      extendedDataOutput.writeInt(count);
+      extendedDataOutput.write(ids, 0, idPos);
+    } catch (IOException e) {
+      throw new IllegalStateException("add: IOException", e);
+    }
+  }
+
+  @Override
+  public void add(I vertexId, M data) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void add(byte[] serializedId, int idPos, M data) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public VertexIdMessageBytesIterator<I, M> getVertexIdMessageBytesIterator() {
+    return null;
+  }
+
+  @Override
+  public VertexIdMessageIterator<I, M> getVertexIdMessageIterator() {
+    return new OneMessageToManyIdsIterator<>(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java
deleted file mode 100644
index f190c17..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteArrayOneToAllMessages.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.utils;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Stores a message and a list of target vertex ids.
- *
- * @param <I> Vertex id
- * @param <M> Message data
- */
-@SuppressWarnings("unchecked")
-public class ByteArrayOneToAllMessages<
-    I extends WritableComparable, M extends Writable>
-    implements Writable, ImmutableClassesGiraphConfigurable {
-  /** Extended data output */
-  private ExtendedDataOutput extendedDataOutput;
-  /** Configuration */
-  private ImmutableClassesGiraphConfiguration<I, ?, ?> configuration;
-  /** Message value class */
-  private MessageValueFactory<M> messageValueFactory;
-
-  /**
-   * Constructor.
-   *
-   * @param messageValueFactory Class for messages
-   */
-  public ByteArrayOneToAllMessages(
-      MessageValueFactory<M> messageValueFactory) {
-    this.messageValueFactory = messageValueFactory;
-  }
-
-  /**
-   * Initialize the inner state. Must be called before {@code add()} is called.
-   */
-  public void initialize() {
-    extendedDataOutput = configuration.createExtendedDataOutput();
-  }
-
-  /**
-   * Initialize the inner state, with a known size. Must be called before
-   * {@code add()} is called.
-   *
-   * @param expectedSize Number of bytes to be expected
-   */
-  public void initialize(int expectedSize) {
-    extendedDataOutput = configuration.createExtendedDataOutput(expectedSize);
-  }
-
-  @Override
-  public void setConf(ImmutableClassesGiraphConfiguration configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  public ImmutableClassesGiraphConfiguration getConf() {
-    return this.configuration;
-  }
-
-  /**
-   * Add a message.
-   * The order is: the message>id count>ids .
-   *
-   * @param ids The byte array which holds target ids
-   *                     of this message on the worker
-   * @param idPos The end position of the ids
-   *                     information in the byte array above.
-   * @param count The number of ids
-   * @param msg The message sent
-   */
-  public void add(byte[] ids, int idPos, int count, M msg) {
-    try {
-      msg.write(extendedDataOutput);
-      extendedDataOutput.writeInt(count);
-      extendedDataOutput.write(ids, 0, idPos);
-    } catch (IOException e) {
-      throw new IllegalStateException("add: IOException", e);
-    }
-  }
-
-  /**
-   * Create a message.
-   *
-   * @return A created message object.
-   */
-  public M createMessage() {
-    return messageValueFactory.newInstance();
-  }
-
-  /**
-   * Get the number of bytes used.
-   *
-   * @return Bytes used
-   */
-  public int getSize() {
-    return extendedDataOutput.getPos();
-  }
-
-  /**
-   * Get the size of ByteArrayOneToAllMessages after serialization.
-   * Here 4 is the size of an integer which represents the size of whole
-   * byte array.
-   *
-   * @return The size (in bytes) of the serialized object
-   */
-  public int getSerializedSize() {
-    return  4 + getSize();
-  }
-
-  @Override
-  public void write(DataOutput dataOutput) throws IOException {
-    dataOutput.writeInt(extendedDataOutput.getPos());
-    dataOutput.write(extendedDataOutput.getByteArray(), 0,
-      extendedDataOutput.getPos());
-  }
-
-  @Override
-  public void readFields(DataInput dataInput) throws IOException {
-    int size = dataInput.readInt();
-    byte[] buf = new byte[size];
-    dataInput.readFully(buf);
-    extendedDataOutput = configuration.createExtendedDataOutput(buf, size);
-  }
-
-  /**
-   * Check if the byte array is empty.
-   *
-   * @return True if the position of the byte array is 0.
-   */
-  public boolean isEmpty() {
-    return extendedDataOutput.getPos() == 0;
-  }
-
-  /**
-   * Get the reader of this OneToAllMessages
-   *
-   * @return ExtendedDataInput
-   */
-  public ExtendedDataInput getOneToAllMessagesReader() {
-    return configuration.createExtendedDataInput(
-      extendedDataOutput.getByteArray(), 0, extendedDataOutput.getPos());
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
index cefec0e..962bc75 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdDataIterator.java
@@ -39,6 +39,8 @@ public class ByteStructVertexIdDataIterator<I extends WritableComparable, T>
   extends ByteStructVertexIdIterator<I> implements VertexIdDataIterator<I, T> {
   /** VertexIdData to iterate over */
   protected AbstractVertexIdData<I, T> vertexIdData;
+  /** Serialized size of the message object in bytestore */
+  protected int dataSize;
   /** Current data. */
   private T data;
 
@@ -63,13 +65,20 @@ public class ByteStructVertexIdDataIterator<I extends WritableComparable, T>
     }
     try {
       vertexId.readFields(extendedDataInput);
+      int initial = extendedDataInput.getPos();
       vertexIdData.readData(extendedDataInput, data);
+      dataSize = extendedDataInput.getPos() - initial;
     } catch (IOException e) {
       throw new IllegalStateException("next: IOException", e);
     }
   }
 
   @Override
+  public int getCurrentDataSize() {
+    return dataSize;
+  }
+
+  @Override
   public T getCurrentData() {
     return data;
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
index b686211..dd91ea2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ByteStructVertexIdMessageIterator.java
@@ -46,4 +46,14 @@ public class ByteStructVertexIdMessageIterator<I extends WritableComparable,
   public M getCurrentMessage() {
     return getCurrentData();
   }
+
+  @Override
+  public int getCurrentMessageSize() {
+    return getCurrentDataSize();
+  }
+
+  @Override
+  public boolean isNewMessage() {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java
new file mode 100644
index 0000000..80c3aee
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedByteArrayOutputBuffer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import org.apache.giraph.conf.FloatConfOption;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.IntConfOption;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Wraps a list of byte array outputs and provides convenient
+ * utilities on top of it
+ */
+public class ExtendedByteArrayOutputBuffer {
+  /**
+   * This option sets the capacity of an
+   * {@link org.apache.giraph.utils.ExtendedDataOutput} instance created in
+   * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
+   */
+  public static final IntConfOption CAPACITY_OF_DATAOUT_IN_BUFFER =
+      new IntConfOption("giraph.capacityOfDataOutInBuffer",
+          1024 * GiraphConstants.ONE_KB,
+          "Set the capacity of dataoutputs in dataout buffer");
+
+  /**
+   * This option sets the maximum fraction of a
+   * {@link org.apache.giraph.utils.ExtendedDataOutput} instance (stored in
+   * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer})
+   * that can be filled
+   */
+  public static final FloatConfOption FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER =
+      new FloatConfOption("giraph.fillingThresholdOfDataoutInBuffer", 0.98f,
+          "Set the maximum fraction of dataoutput capacity allowed to fill");
+
+  /** Maximum size allowed for one byte array output */
+  private final int maxBufSize;
+  /** Stop writing to buffer after threshold has been reached */
+  private final int threshold;
+  /** Giraph configuration */
+  private final ImmutableClassesGiraphConfiguration<?, ? , ?> config;
+
+  /** Map of index => byte array outputs */
+  private final Int2ObjectOpenHashMap<ExtendedDataOutput>
+  bytearrayOutputs = new Int2ObjectOpenHashMap<>();
+  /** Size of byte array outputs map */
+  private final AtomicInteger mapSize = new AtomicInteger(0);
+  /** Thread local variable to get hold of a byte array output stream */
+  private final ThreadLocal<IndexAndDataOut> threadLocal =
+      new ThreadLocal<IndexAndDataOut>() {
+        @Override
+        protected IndexAndDataOut initialValue() {
+          return newIndexAndDataOutput();
+        }
+      };
+
+  /**
+   * Constructor
+   *
+   * @param config configuration
+   */
+  public ExtendedByteArrayOutputBuffer(
+    ImmutableClassesGiraphConfiguration<?, ?, ?> config) {
+    this.config = config;
+
+    maxBufSize = CAPACITY_OF_DATAOUT_IN_BUFFER.get(config);
+    threshold = (int) (FILLING_THRESHOLD_OF_DATAOUT_IN_BUFFER.get(config) *
+      maxBufSize);
+  }
+
+  /**
+   * Return threadLocal indexAndDataOutput instance
+   *
+   * @return threadLocal indexAndDataOutput instance
+   */
+  public IndexAndDataOut getIndexAndDataOut() {
+    IndexAndDataOut indexAndDataOut = threadLocal.get();
+    if (indexAndDataOut.dataOutput.getPos() >= threshold) {
+      indexAndDataOut = newIndexAndDataOutput();
+      threadLocal.set(indexAndDataOut);
+    }
+    return indexAndDataOut;
+  }
+
+  /**
+   * Get dataoutput from bytearrayOutputs
+   *
+   * @param index index in bytearrayOutputs
+   * @return extendeddataoutput at given index
+   */
+  public ExtendedDataOutput getDataOutput(int index) {
+    return bytearrayOutputs.get(index);
+  }
+
+  /**
+   * Holder for index & DataOutput objects
+   */
+  public static class IndexAndDataOut {
+    /** Index */
+    private final int index;
+    /** Dataouput instance */
+    private final ExtendedDataOutput dataOutput;
+
+    /**
+     * Constructor
+     *
+     * @param index index in bytearrayOutputs
+     * @param dataOutput dataoutput
+     */
+    public IndexAndDataOut(int index, ExtendedDataOutput dataOutput) {
+      this.index = index;
+      this.dataOutput = dataOutput;
+    }
+
+    public int getIndex() {
+      return index;
+    }
+
+    public ExtendedDataOutput getDataOutput() {
+      return dataOutput;
+    }
+  }
+
+  /**
+   * Create a new IndexAndDataOutput instance
+   * @return new IndexAndDataOutput instance
+   */
+  private IndexAndDataOut newIndexAndDataOutput() {
+    int index = mapSize.getAndIncrement();
+    ExtendedDataOutput output = config.createExtendedDataOutput(
+        maxBufSize);
+    synchronized (bytearrayOutputs) {
+      bytearrayOutputs.put(index, output);
+    }
+    return new IndexAndDataOut(index, output);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
index bc979af..0da9681 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ExtendedDataOutput.java
@@ -82,4 +82,3 @@ public interface ExtendedDataOutput extends DataOutput {
    */
   void reset();
 }
-

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java
new file mode 100644
index 0000000..f353b2d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/OneMessageToManyIdsIterator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * VertexIdData iterator for
+ * {@link ByteArrayOneMessageToManyIds}
+ *
+ * @param <I> vertexId type
+ * @param <M> message type
+ */
+public class OneMessageToManyIdsIterator<I extends WritableComparable,
+    M extends Writable> implements VertexIdMessageIterator<I, M> {
+  /** VertexIdMessages object to iterate over */
+  private final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages;
+  /** Reader of the serialized edges */
+  private final ExtendedDataInput extendedDataInput;
+
+  /** Current vertex Id*/
+  private I vertexId;
+  /** Current message */
+  private M msg;
+  /** Counts of ids left to read before next message */
+  private int idsToRead = 0;
+  /** Size of message read */
+  private int msgSize = 0;
+  /** Is current message newly read */
+  private boolean newMessage;
+
+  /**
+   * Constructor
+   *
+   * @param vertexIdMessages vertexId messages object to iterate over
+   */
+  public OneMessageToManyIdsIterator(
+      final ByteArrayOneMessageToManyIds<I, M> vertexIdMessages) {
+    this.vertexIdMessages = vertexIdMessages;
+    this.extendedDataInput = vertexIdMessages.getConf()
+        .createExtendedDataInput(vertexIdMessages.extendedDataOutput);
+  }
+
+  @Override
+  public I getCurrentVertexId() {
+    return vertexId;
+  }
+
+  @Override
+  public M getCurrentMessage() {
+    return getCurrentData();
+  }
+
+  @Override
+  public M getCurrentData() {
+    return msg;
+  }
+
+  @Override
+  public M releaseCurrentData() {
+    M releasedData = msg;
+    msg = null;
+    return releasedData;
+  }
+
+  @Override
+  public I releaseCurrentVertexId() {
+    I releasedVertexId = vertexId;
+    vertexId = null;
+    return releasedVertexId;
+  }
+
+  @Override
+  public boolean hasNext() {
+    return extendedDataInput.available() > 0;
+  }
+
+  /**
+   * Properly initialize vertexId & msg object before calling next()
+   */
+  private void initialize() {
+    if (vertexId == null) {
+      vertexId = vertexIdMessages.getConf().createVertexId();
+    }
+    if (msg == null) {
+      msg = vertexIdMessages.createData();
+    }
+  }
+
+  @Override
+  public void next() {
+    initialize();
+    try {
+      if (idsToRead == 0) {
+        newMessage = true; // a new message is read
+        int initial = extendedDataInput.getPos();
+        msg.readFields(extendedDataInput);
+        msgSize = extendedDataInput.getPos() - initial;
+        idsToRead = extendedDataInput.readInt();
+      } else {
+        newMessage = false; // same as previous message
+      }
+      vertexId.readFields(extendedDataInput);
+      idsToRead -= 1;
+    } catch (IOException e) {
+      throw new IllegalStateException("next: IOException", e);
+    }
+  }
+
+  @Override
+  public int getCurrentMessageSize() {
+    return getCurrentDataSize();
+  }
+
+  @Override
+  public int getCurrentDataSize() {
+    return msgSize;
+  }
+
+  @Override
+  public boolean isNewMessage() {
+    return newMessage;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
index 1ab8de6..c5587e1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeArrayReads.java
@@ -55,7 +55,7 @@ public class UnsafeArrayReads extends UnsafeReads {
       UNSAFE.arrayBaseOffset(byte[].class);
 
   /** Byte buffer */
-  private final byte[] buf;
+  protected byte[] buf;
 
   /**
    * Constructor

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
index 5f99846..39ab352 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReads.java
@@ -28,7 +28,7 @@ import java.io.UTFDataFormatException;
 public abstract class UnsafeReads implements ExtendedDataInput {
 
   /** Buffer length */
-  protected final int bufLength;
+  protected int bufLength;
   /** Position in the buffer */
   protected long pos = 0;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
new file mode 100644
index 0000000..a75815a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/UnsafeReusableByteArrayInput.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+/**
+ * UnsafeReusableByteArrayInput is a data structure to read from a
+ * byte buffer with a read pointer that can be moved to desired location
+ */
+public class UnsafeReusableByteArrayInput extends UnsafeArrayReads {
+
+  /**
+   * Default Constructor
+   */
+  public UnsafeReusableByteArrayInput() {
+    super(null, 0, 0);
+  }
+
+  /**
+   * Initialize the object with all required parameters
+   *
+   * @param buf byte buffer
+   * @param offset offset in the buffer
+   * @param length length of the valid data
+   */
+  public void initialize(byte[] buf, int offset, int length) {
+    this.buf = buf;
+    this.pos = offset;
+    this.bufLength = length;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
index 6aea8ea..80792a5 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdDataIterator.java
@@ -41,6 +41,13 @@ public interface VertexIdDataIterator<I extends WritableComparable, T>
   T getCurrentData();
 
   /**
+   * Get serialized size of current data
+   *
+   * @return serialized size of data
+   */
+  int getCurrentDataSize();
+
+  /**
    * Release the current data object.
    *
    * @return Released data object

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
index c241cea..288f7ce 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VertexIdMessageIterator.java
@@ -36,4 +36,18 @@ public interface VertexIdMessageIterator<I extends WritableComparable,
    * @return Current message
    */
   M getCurrentMessage();
+
+  /**
+   * Get the serialized size of current message
+   *
+   * @return serialized size of current message
+   */
+  int getCurrentMessageSize();
+
+  /**
+   * Return true of current message is new
+   *
+   * @return true if current message is new
+   */
+  boolean isNewMessage();
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 8037db9..b56bab3 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -24,7 +24,7 @@ import org.apache.giraph.comm.netty.handler.WorkerRequestServerHandler;
 import org.apache.giraph.comm.requests.SendPartitionMutationsRequest;
 import org.apache.giraph.comm.requests.SendVertexRequest;
 import org.apache.giraph.comm.requests.SendWorkerMessagesRequest;
-import org.apache.giraph.comm.requests.SendWorkerOneToAllMessagesRequest;
+import org.apache.giraph.comm.requests.SendWorkerOneMessageToManyRequest;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -36,8 +36,8 @@ import org.apache.giraph.graph.VertexMutations;
 import org.apache.giraph.metrics.GiraphMetrics;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.PartitionStore;
+import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
 import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.ByteArrayOneToAllMessages;
 import org.apache.giraph.utils.ByteArrayVertexIdMessages;
 import org.apache.giraph.utils.ExtendedDataOutput;
 import org.apache.giraph.utils.IntNoOpComputation;
@@ -196,10 +196,10 @@ public class RequestTest {
   }
 
   @Test
-  public void sendWorkerOneToAllMessagesRequest() throws IOException {
+  public void sendWorkerIndividualMessagesRequest() throws IOException {
     // Data to send
-    ByteArrayOneToAllMessages<IntWritable, IntWritable>
-        dataToSend = new ByteArrayOneToAllMessages<>(new
+    ByteArrayOneMessageToManyIds<IntWritable, IntWritable>
+        dataToSend = new ByteArrayOneMessageToManyIds<>(new
         TestMessageValueFactory<>(IntWritable.class));
     dataToSend.setConf(conf);
     dataToSend.initialize();
@@ -211,8 +211,8 @@ public class RequestTest {
     dataToSend.add(output.getByteArray(), output.getPos(), 7, new IntWritable(1));
 
     // Send the request
-    SendWorkerOneToAllMessagesRequest<IntWritable, IntWritable> request =
-      new SendWorkerOneToAllMessagesRequest<>(dataToSend, conf);
+    SendWorkerOneMessageToManyRequest<IntWritable, IntWritable> request =
+      new SendWorkerOneMessageToManyRequest<>(dataToSend, conf);
     client.sendWritableRequest(workerInfo.getTaskId(), request);
     client.waitAllRequests();
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f31e9a32/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index d3c392e..5903eb8 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
 
 import org.apache.giraph.bsp.CentralizedServiceWorker;
 import org.apache.giraph.combiner.DoubleSumMessageCombiner;
-import org.apache.giraph.comm.messages.primitives.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
 import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;