You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:44:06 UTC

[078/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch 'feature/GEODE-917' into develop

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index 0000000,4bfd44b..a6495e2
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@@ -1,0 -1,1100 +1,1116 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+ 
+ import java.io.EOFException;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.net.Socket;
+ import java.net.SocketTimeoutException;
+ import java.nio.ByteBuffer;
+ import java.nio.channels.SocketChannel;
+ import java.util.concurrent.Semaphore;
+ import java.util.concurrent.TimeUnit;
+ 
+ import org.apache.logging.log4j.Logger;
+ 
+ import com.gemstone.gemfire.SerializationException;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.SocketUtils;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+ import com.gemstone.gemfire.internal.cache.tier.MessageType;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.util.BlobHelper;
+ 
+ /**
+  * This class encapsulates the wire protocol. It provides accessors to
+  * encode and decode a message and  serialize it out to the wire.
+  *
+  * <PRE>
+  * msgType       - int   - 4 bytes type of message, types enumerated below
+  *
+  * msgLength     - int - 4 bytes   total length of variable length payload
+  *
+  * numberOfParts - int - 4 bytes   number of elements (LEN-BYTE* pairs)
+  *                     contained in the payload. Message can
+  *                       be a multi-part message
+  *
+  * transId       - int - 4 bytes  filled in by the requestor, copied back into
+  *                    the response
+  *
+  * flags         - byte- 1 byte   filled in by the requestor
+  * len1
+  * part1
+  * .
+  * .
+  * .
+  * lenn
+  * partn
+  * </PRE>
+  *
+  * We read the fixed length 16 bytes into a byte[] and populate a bytebuffer
+  * We read the fixed length header tokens from the header
+  * parse the header and use information contained in there to read the payload.
+  *
+  * <P>
+  *
+  * See also <a href="package-summary.html#messages">package description</a>.
+  *
+  * @see com.gemstone.gemfire.internal.cache.tier.MessageType
+  *
+  */
+ public class Message  {
+ 
++  /**
++   * maximum size of an outgoing message.  See GEODE-478
++   */
++  static final int MAX_MESSAGE_SIZE = Integer.getInteger("gemfire.client.max-message-size", 1073741824).intValue();
++
+   private static final Logger logger = LogService.getLogger();
+   
+   private static final int PART_HEADER_SIZE = 5; // 4 bytes for length, 1 byte for isObject
+   
+   private static final int FIXED_LENGTH = 17;
+ 
+   private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>();
+ 
+   protected int msgType;
+   protected int payloadLength=0;
+   protected int numberOfParts =0;
+   protected int transactionId = TXManagerImpl.NOTX;
+   protected int currentPart = 0;
+   protected Part[] partsList = null;
+   protected ByteBuffer cachedCommBuffer;
+   protected Socket socket = null;
+   protected SocketChannel sockCh = null;
+   protected OutputStream os = null;
+   protected InputStream is = null;
+   protected boolean messageModified = true;
+   /** is this message a retry of a previously sent message? */
+   protected boolean isRetry;
+   private byte flags = 0x00;
+   protected MessageStats msgStats = null;
+   protected ServerConnection sc = null;
+   private int maxIncomingMessageLength = -1;
+   private Semaphore dataLimiter = null;
+ //  private int MAX_MSGS = -1;
+   private Semaphore msgLimiter = null;
+   private boolean hdrRead = false;  
+   private int chunkSize = 1024;//Default Chunk Size.
+ 
+   protected Part securePart = null;
+   private boolean isMetaRegion = false;
+ 
+ 
+   // These two statics are fields shoved into the flags byte for transmission.
+   // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other
+   // is left in place
+   public static final byte MESSAGE_HAS_SECURE_PART = (byte)0x02;
+   public static final byte MESSAGE_IS_RETRY = (byte)0x04;
+   
+   public static final byte MESSAGE_IS_RETRY_MASK = (byte)0xFB;
+ 
+   // Tentative workaround to avoid OOM stated in #46754.
+   public static final ThreadLocal<Integer> messageType = new ThreadLocal<Integer>();
+   
+   Version version;
+   
+   /**
+    * Creates a new message with the given number of parts
+    */
+   public Message(int numberOfParts, Version destVersion) {
+     this.version = destVersion;
+     Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message");
+     partsList = new Part[numberOfParts];
+     this.numberOfParts = numberOfParts;
+     for (int i=0;i<partsList.length;i++) {
+       partsList[i] = new Part();
+     }
+   }
+ 
+   public boolean isSecureMode() {    
+     return securePart != null;
+   }
+   
+   public byte[] getSecureBytes()
+     throws IOException, ClassNotFoundException {
+     return (byte[])this.securePart.getObject();
+   }
+   
+   public void setMessageType(int msgType) {
+     this.messageModified = true;
+     if (!MessageType.validate(msgType)) {
+       throw new IllegalArgumentException(LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString());
+     }
+     this.msgType = msgType;
+   }
+   
+   public void setVersion(Version clientVersion) {
+     this.version = clientVersion;
+   }
+ 
+   public void setMessageHasSecurePartFlag() {
+     this.flags = (byte)(this.flags | MESSAGE_HAS_SECURE_PART);
+   }
+   
+   public void clearMessageHasSecurePartFlag() {
+     this.flags = (byte)(this.flags & MESSAGE_HAS_SECURE_PART);
+   }
+ 
+   /**
+    *  Sets and builds the {@link Part}s that are sent
+    *  in the payload of the Message
+    * @param numberOfParts
+    */
+   public void setNumberOfParts(int numberOfParts) {
+     //TODO:hitesh need to add security header here from server
+     //need to insure it is not chunked message
+     //should we look message type to avoid internal message like ping
+     this.messageModified = true;
+     this.currentPart=0;
+     this.numberOfParts = numberOfParts;
+     if (numberOfParts > this.partsList.length) {
+       Part[] newPartsList = new Part[numberOfParts];
+       for (int i=0;i<numberOfParts;i++) {
+         if (i < this.partsList.length) {
+           newPartsList[i] = this.partsList[i];
+         } else {
+           newPartsList[i] = new Part();
+         }
+       }
+       this.partsList = newPartsList;
+     }
+   }
++  
++  /**
++   * For boundary testing we may need to inject mock parts
++   * @param parts
++   */
++  void setParts(Part[] parts) {
++    this.partsList = parts;
++  }
+ 
+   public void setTransactionId(int transactionId) {
+     this.messageModified = true;
+     this.transactionId = transactionId;
+   }
+   
+   public void setIsRetry() {
+     this.isRetry = true;
+   }
+   
+   /**
+    * This returns true if the message has been marked as having been previously
+    * transmitted to a different server.
+    */
+   public boolean isRetry() {
+     return this.isRetry;
+   }
+ 
+   /*Sets size for HDOS chunk.*/
+   public void setChunkSize(int chunkSize) {
+     this.chunkSize = chunkSize;
+   }
+   
+   /**
+    * When building a Message this will return the number of the
+    * next Part to be added to the message
+    */
+   public int getNextPartNumber() {
+     return this.currentPart;
+   }
+ 
+   public void addStringPart(String str) {
+     if (str==null) {
+       addRawPart((byte[])null, false);
+     }
+     else {
+       HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+       this.messageModified = true;
+       Part part = partsList[this.currentPart];
+       part.setPartState(hdos, false);
+       this.currentPart++;
+     }
+   }
+ 
+   /*
+    * Adds a new part to this message that contains a <code>byte</code>
+    * array (as opposed to a serialized object).
+    *
+    * @see #addPart(byte[], boolean)
+    */
+   public void addBytesPart(byte[] newPart) {
+     addRawPart(newPart, false);
+   }
+ 
+   public void addStringOrObjPart(Object o) {
+     if (o instanceof String || o == null) {
+       addStringPart((String)o);
+     } else {
+       // Note even if o is a byte[] we need to serialize it.
+       // This could be cleaned up but it would require C client code to change.
+       serializeAndAddPart(o, false);
+     }
+   }
+ 
+   public void addDeltaPart(HeapDataOutputStream hdos) {
+     this.messageModified = true;
+     Part part = partsList[this.currentPart];
+     part.setPartState(hdos, false);
+     this.currentPart++;
+   }
+ 
+   public void addObjPart(Object o) {
+     addObjPart(o, false);
+   }
+   /**
+    * Like addObjPart(Object) but also prefers to reference
+    * objects in the part instead of copying them into a byte buffer.
+    */
+   public void addObjPartNoCopying(Object o) {
+     if (o == null || o instanceof byte[]) {
+       addRawPart((byte[])o, false);
+     } else {
+       serializeAndAddPartNoCopying(o);
+     }
+   }
+   public void addObjPart(Object o, boolean zipValues) {
+     if (o == null || o instanceof byte[]) {
+       addRawPart((byte[])o, false);
+     } else {
+       serializeAndAddPart(o, zipValues);
+     }
+   }
+   public void addPartInAnyForm(@Unretained Object o, boolean isObject) {
+     if (o == null) {
+       addRawPart((byte[])o, false);
+     } else if (o instanceof byte[]) {
+       addRawPart((byte[])o, isObject);
+     } else if (o instanceof StoredObject) {
+       // It is possible it is an off-heap StoredObject that contains a simple non-object byte[].
+       this.messageModified = true;
+       Part part = partsList[this.currentPart];
+       part.setPartState((StoredObject)o, isObject);
+       this.currentPart++;
+     } else {
+       serializeAndAddPart(o, false);
+     }
+   }
+   
+   private void serializeAndAddPartNoCopying(Object o) {
+     HeapDataOutputStream hdos;
+     Version v = version;
+     if (version.equals(Version.CURRENT)){
+       v = null;
+     }
+     // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources passed to it.
+     hdos = new HeapDataOutputStream(chunkSize, v, true);
+     // TODO OFFHEAP: Change Part to look for an HDOS and just pass a reference to its DirectByteBuffer.
+     // Then change HDOS sendTo(SocketChannel...) to use the GatheringByteChannel to write a bunch of bbs.
+     // TODO OFFHEAP This code optimizes one part which works pretty good for getAll since all the values are
+     // returned in one part. But the following seems even better...
+     // BETTER: change Message to consolidate all the part hdos bb lists into a single bb array and have it do the GatheringByteChannel write.
+     // Message can use slice for the small parts (msg header and part header) that are not in the parts data (its a byte array, Chunk, or HDOS).
+     // EVEN BETTER: the message can have a single HDOS which owns a direct comm buffer. It can reserve space if it does not yet know the value to write (for example the size of the message or part).
+     // If we write something to the HDOS that is direct then it does not need to be copied.
+     // But large heap byte arrays will need to be copied to the hdos (the socket write does this anyway).
+     // If the direct buffer is full then we can allocate another one. If a part is already in a heap byte array
+     // then we could defer copying it by slicing the current direct bb and then adding the heap byte array
+     // as bb using ByteBuffer.wrap. Once we have all the data in the HDOS we can finally generate the header
+     // and then start working on sending the ByteBuffers to the channel. If we have room in a direct bb then
+     // we can copy a heap bb to it. Otherwise we can write the bb ahead of it which would free up room to copy
+     // the heap bb to the existing direct bb without needing to allocate extra direct bbs.
+     // Delaying the flush uses more direct memory but reduces the number of system calls.
+     try {
+       BlobHelper.serializeTo(o, hdos);
+     } catch (IOException ex) {
+       throw new SerializationException("failed serializing object", ex);
+     }
+     this.messageModified = true;
+     Part part = partsList[this.currentPart];
+     part.setPartState(hdos, true);
+     this.currentPart++;
+     
+   }
+ 
+   private void serializeAndAddPart(Object o, boolean zipValues) {
+     if (zipValues) {
+       throw new UnsupportedOperationException("zipValues no longer supported");    
+       
+     } else {
+       HeapDataOutputStream hdos;
+       Version v = version;
+       if (version.equals(Version.CURRENT)){
+         v = null;
+       }
+       hdos = new HeapDataOutputStream(chunkSize, v);
+       try {
+         BlobHelper.serializeTo(o, hdos);
+       } catch (IOException ex) {
+         throw new SerializationException("failed serializing object", ex);
+       }
+       this.messageModified = true;
+       Part part = partsList[this.currentPart];
+       part.setPartState(hdos, true);
+       this.currentPart++;
+     }
+   }
+ 
+   public void addIntPart(int v) {
+     this.messageModified = true;
+     Part part = partsList[this.currentPart];
+     part.setInt(v);
+     this.currentPart++;
+   }
+   
+   public void addLongPart(long v) {
+     this.messageModified = true;
+     Part part = partsList[this.currentPart];
+     part.setLong(v);
+     this.currentPart++;
+   }
+   
+   /**
+    * Adds a new part to this message that may contain a serialized
+    * object.
+    */
+   public void addRawPart(byte[] newPart,boolean isObject) {
+     this.messageModified = true;
+     Part part = partsList[this.currentPart];
+     part.setPartState(newPart, isObject);
+     this.currentPart++;
+   }
+ 
+   public int getMessageType() {
+     return this.msgType;
+   }
+ 
+   public int getPayloadLength() {
+     return this.payloadLength;
+   }
+ 
+   public int getHeaderLength() {
+     return FIXED_LENGTH;
+   }
+ 
+   public int getNumberOfParts() {
+     return this.numberOfParts;
+   }
+ 
+   public int getTransactionId() {
+     return this.transactionId;
+   }
+   
+   public Part getPart(int index) {
+     if (index < this.numberOfParts) {
+       Part p = partsList[index];
+       if (this.version != null) {
+         p.setVersion(this.version);
+       }
+       return p;
+     }
+     return null;
+   }
+ 
+   public static ByteBuffer setTLCommBuffer(ByteBuffer bb) {
+     ByteBuffer result = tlCommBuffer.get();
+     tlCommBuffer.set(bb);
+     return result;
+   }
+ 
+   public ByteBuffer getCommBuffer() {
+     if (this.cachedCommBuffer != null) {
+       return this.cachedCommBuffer;
+     }
+     else {
+       return tlCommBuffer.get();
+     }
+   }
+ 
+   public void clear() {
+     this.isRetry = false;
+     int len = this.payloadLength;
+     if (len != 0) {
+       this.payloadLength = 0;
+     }
+     if (this.hdrRead) {
+       if (this.msgStats != null) {
+         this.msgStats.decMessagesBeingReceived(len);
+       }
+     }
+     ByteBuffer buffer = getCommBuffer();
+     if (buffer != null) {
+       buffer.clear();
+     }
+     clearParts();
+     if (len != 0 && this.dataLimiter != null) {
+       this.dataLimiter.release(len);
+       this.dataLimiter = null;
+       this.maxIncomingMessageLength = 0;
+     }
+     if (this.hdrRead) {
+       if (this.msgLimiter != null) {
+         this.msgLimiter.release(1);
+         this.msgLimiter = null;
+       }
+       this.hdrRead = false;
+     }
+     this.flags = 0;
+   }
+ 
+   protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) {
+     //TODO:hitesh setting second bit of flags byte for client 
+     //this is not require but this makes all changes easily at client side right now
+     //just see this bit and process security header
+     byte flagsByte = this.flags;
+     if (isSecurityHeader) {
+       flagsByte |= MESSAGE_HAS_SECURE_PART;
+     }
+     if (this.isRetry) {
+       flagsByte |= MESSAGE_IS_RETRY;
+     }
+     getCommBuffer()
+       .putInt(this.msgType)
+       .putInt(msgLen)
+       .putInt(this.numberOfParts)
+       .putInt(this.transactionId)
+       .put(flagsByte);
+   }
+ 
+   protected Part getSecurityPart() {
+     if (this.sc != null ) {
+       //look types right put get etc
+      return this.sc.updateAndGetSecurityPart(); 
+     }
+     return null;
+   }
+ 
+   public void setSecurePart(byte[] bytes) {
+     this.securePart = new Part();
+     this.securePart.setPartState(bytes, false);
+   }
+ 
+   public void setMetaRegion(boolean isMetaRegion) {
+     this.isMetaRegion = isMetaRegion;
+   }
+ 
+   public boolean getAndResetIsMetaRegion() {
+     boolean isMetaRegion = this.isMetaRegion;
+     this.isMetaRegion = false;
+     return isMetaRegion;
+   }
+ 
+   /**
+    * Sends this message out on its socket.
+    */
+   protected void sendBytes(boolean clearMessage) throws IOException {
+     if (this.sc != null) {
+       // Keep track of the fact that we are making progress.
+       this.sc.updateProcessingMessage();
+     }
+     if (this.socket != null) {
+       final ByteBuffer cb = getCommBuffer();
+       if (cb == null) {
+         throw new IOException("No buffer");
+       }
++      int msgLen = 0;
+       synchronized(cb) {
 -        int numOfSecureParts = 0;
 -        Part securityPart = this.getSecurityPart();
 -        boolean isSecurityHeader = false;
++        long totalPartLen = 0;
++        long headerLen = 0;
++        int partsToTransmit = this.numberOfParts;
+         
 -        if (securityPart != null) {
 -          isSecurityHeader = true;
 -          numOfSecureParts = 1;
 -        }
 -        else if (this.securePart != null) {
 -          // This is a client sending this message.
 -          securityPart = this.securePart;
 -          isSecurityHeader = true;
 -          numOfSecureParts = 1;          
 -        }
 -
 -        int totalPartLen = 0;
 -        for (int i=0;i<this.numberOfParts;i++){
++        for (int i=0; i < this.numberOfParts; i++) {
+           Part part = this.partsList[i];
++          headerLen += PART_HEADER_SIZE;
+           totalPartLen += part.getLength();
+         }
+ 
 -        if(numOfSecureParts == 1) {
++        Part securityPart = this.getSecurityPart();
++        if (securityPart == null) {
++          securityPart = this.securePart;
++        }
++        if (securityPart != null) {
++          headerLen += PART_HEADER_SIZE;
+           totalPartLen += securityPart.getLength();
++          partsToTransmit++;
+         }
 -        int msgLen = (PART_HEADER_SIZE * (this.numberOfParts + numOfSecureParts)) + totalPartLen;
++
++        if ( (headerLen + totalPartLen) > Integer.MAX_VALUE ) {
++          throw new MessageTooLargeException("Message size (" + (headerLen + totalPartLen) 
++              + ") exceeds maximum integer value");
++        }
++        
++        msgLen = (int)(headerLen + totalPartLen);
++        
++        if (msgLen > MAX_MESSAGE_SIZE) {
++          throw new MessageTooLargeException("Message size(" + msgLen
++              + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")");
++        }
++        
+         cb.clear();
 -        packHeaderInfoForSending(msgLen, isSecurityHeader);
 -        for (int i=0;i<this.numberOfParts + numOfSecureParts;i++) {
 -          Part part = null;
 -          if(i == this.numberOfParts) {
 -            part = securityPart;
 -          }
 -          else {
 -            part = partsList[i];
 -          }
++        packHeaderInfoForSending(msgLen, (securityPart != null));
++        for (int i=0; i < partsToTransmit; i++) {
++          Part part = (i == this.numberOfParts) ? securityPart : partsList[i];
++
+           if (cb.remaining() < PART_HEADER_SIZE) {
+             flushBuffer();
+           }
++          
+           int partLen = part.getLength();
+           cb.putInt(partLen);
+           cb.put(part.getTypeCode());
+           if (partLen <= cb.remaining()) {
 -            part.sendTo(cb);
++            part.writeTo(cb);
+           } else {
+             flushBuffer();
 -            // send partBytes
+             if (this.sockCh != null) {
 -              part.sendTo(this.sockCh, cb);
++              part.writeTo(this.sockCh, cb);
+             } else {
 -              part.sendTo(this.os, cb);
++              part.writeTo(this.os, cb);
+             }
+             if (this.msgStats != null) {
+               this.msgStats.incSentBytes(partLen);
+             }
+           }
+         }
+         if (cb.position() != 0) {
+           flushBuffer();
+         }
+         this.messageModified = false;
+         if (this.sockCh == null) {
+           this.os.flush();
+         }
+       }
+       if(clearMessage) {
+         clearParts();
+       }
+     }
+     else {
+       throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
+     }
+   }
+ 
+   protected void flushBuffer() throws IOException {
+     final ByteBuffer cb = getCommBuffer();
+     if (this.sockCh != null) {
+       cb.flip();
+       do {
+         this.sockCh.write(cb);
+       } while (cb.remaining() > 0);
+     } else {
+       this.os.write(cb.array(), 0, cb.position());
+     }
+     if (this.msgStats != null) {
+       this.msgStats.incSentBytes(cb.position());
+     }
+     cb.clear();
+   }
+ 
+   private void read()
+   throws IOException {
+     clearParts();
+     //TODO:Hitesh ??? for server changes make sure sc is not null as this class also used by client :(
+     readHeaderAndPayload();
+   }
+ 
+   /**
+    * Read the actual bytes of the header off the socket
+    */
+   protected final void fetchHeader() throws IOException {
+     final ByteBuffer cb = getCommBuffer();
+     cb.clear();
+     // msgType is invalidated here and can be used as an indicator
+     // of problems reading the message
+     this.msgType = MessageType.INVALID;
+ 
+     int hdr = 0;
+ 
+     final int headerLength = getHeaderLength();
+     if (this.sockCh != null) {
+       cb.limit(headerLength);
+       do {
+         int bytesRead = this.sockCh.read(cb);
+         //System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes commBuffer=" + cb);
+         if (bytesRead == -1) {
+           throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER.toLocalizedString());
+         }
+         if (this.msgStats != null) {
+           this.msgStats.incReceivedBytes(bytesRead);
+         }
+       } while (cb.remaining() > 0);
+       cb.flip();
+     } else {
+       do {
+         int bytesRead = -1;
+         try {
+           bytesRead = this.is.read(cb.array(),hdr, headerLength-hdr);
+         }
+         catch (SocketTimeoutException e) {
+ //          bytesRead = 0;
+           // TODO add a cancellation check
+           throw e;
+         }
+         if (bytesRead == -1) {
+           throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER.toLocalizedString());
+         }
+         hdr += bytesRead;
+         if (this.msgStats != null) {
+           this.msgStats.incReceivedBytes(bytesRead);
+         }
+       } while (hdr < headerLength);
+ 
+       // now setup the commBuffer for the caller to parse it
+       cb.rewind();
+     }
+   }
+ 
+   private void readHeaderAndPayload()
+   throws IOException {
+     //TODO:Hitesh ???
+     fetchHeader();
+     final ByteBuffer cb = getCommBuffer();
+     final int type = cb.getInt();
+     final int len = cb.getInt();
+     final int numParts = cb.getInt();
+     final int txid = cb.getInt();
+     byte bits = cb.get();
+     cb.clear();
+ 
+     if (!MessageType.validate(type)) {
+       throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER.toLocalizedString(Integer.valueOf(type)));
+     }
+     int timeToWait = 0;
+     if (this.sc != null) {
+       // Keep track of the fact that a message is being processed.
+       this.sc.setProcessingMessage();
+       timeToWait = sc.getClientReadTimeout();
+     }
+     this.hdrRead = true;
+     if (this.msgLimiter != null) {
+         for (;;) {
+           this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+           boolean interrupted = Thread.interrupted();
+           try {
+             if (timeToWait == 0) {
+               this.msgLimiter.acquire(1);
+             } 
+             else {
+               if (!this.msgLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) {
+                 if (this.msgStats != null
+                     && this.msgStats instanceof CacheServerStats) {
+                   ((CacheServerStats)this.msgStats).incConnectionsTimedOut();
+                 }
+                 throw new IOException(LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS.toLocalizedString(Integer.valueOf(timeToWait)));
+               }
+             }
+             break;
+           }
+           catch (InterruptedException e) {
+             interrupted = true;
+           }
+           finally {
+             if (interrupted) {
+               Thread.currentThread().interrupt();
+             }
+           }
+         } // for
+     }
+     if (len > 0) {
+       if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) {
+         throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1.toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(this.maxIncomingMessageLength)}));
+       }
+       if (this.dataLimiter != null) {
+         for (;;) {
+           if (sc != null) {
+             this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+           }
+           boolean interrupted = Thread.interrupted();
+           try {
+             if (timeToWait == 0) {
+               this.dataLimiter.acquire(len);
+             } 
+             else {
+               int newTimeToWait = timeToWait;
+               if (this.msgLimiter != null) {
+                 // may have waited for msg limit so recalc time to wait
+                 newTimeToWait -= (int)sc.getCurrentMessageProcessingTime();
+               }
+               if (newTimeToWait <= 0 || !this.msgLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) {
+                 throw new IOException(LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS.toLocalizedString(timeToWait));
+               }
+             }
+             this.payloadLength = len; // makes sure payloadLength gets set now so we will release the semaphore
+             break; // success
+           }
+           catch (InterruptedException e) {
+             interrupted = true;
+           }
+           finally {
+             if (interrupted) {
+               Thread.currentThread().interrupt();
+             }
+           }
+         }
+       }
+     }
+     if (this.msgStats != null) {
+       this.msgStats.incMessagesBeingReceived(len);
+       this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear
+     }
+     
+     this.isRetry = (bits & MESSAGE_IS_RETRY) != 0;
+     bits = (byte)(bits & MESSAGE_IS_RETRY_MASK);
+     this.flags = bits;
+     // TODO why is the msgType set twice, here and after reading the payload fields?
+     this.msgType = type;
+ 
+     readPayloadFields(numParts, len);
+ 
+     // Set the header and payload fields only after receiving all the
+     // socket data, providing better message consistency in the face
+     // of exceptional conditions (e.g. IO problems, timeouts etc.)
+     this.msgType = type;
+     this.payloadLength = len;
+     // this.numberOfParts = numParts;  Already set in setPayloadFields via setNumberOfParts
+     this.transactionId = txid;
+     this.flags = bits;
+     if (this.sc != null) {
+       // Keep track of the fact that a message is being processed.
+       this.sc.updateProcessingMessage();
+     }
+   }
+ 
+   protected void readPayloadFields(final int numParts, final int len)
+   throws IOException {
+     //TODO:Hitesh
+     if (len > 0 && numParts <= 0 ||
+         len <= 0 && numParts > 0) {
+       throw new IOException(LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT.toLocalizedString(
+             new Object[] {Integer.valueOf(len), Integer.valueOf(numParts)}));
+     }
+ 
+     Integer msgType = messageType.get();
+     if (msgType != null && msgType == MessageType.PING) {
+       messageType.set(null); // set it to null right away.
+       int pingParts = 10; // Some number which will not throw OOM but still be acceptable for a ping operation.
+       if (numParts > pingParts) {
+         throw new IOException("Part length ( " + numParts
+             + " ) is  inconsistent for " + MessageType.getString(msgType)
+             + " operation.");
+       }
+     }
+     setNumberOfParts(numParts);
+     if (numParts <= 0)
+       return;
+   
+     if (len < 0) {
+       logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, len));
+       throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
+     }    
+     
+     final ByteBuffer cb = getCommBuffer();
+     cb.clear();
+     cb.flip();
+ 
+     int readSecurePart = 0;
+     //TODO:Hitesh look if securePart can be cached here
+     readSecurePart = checkAndSetSecurityPart();
+     
+     int bytesRemaining = len;
+     for (int i = 0; ((i < numParts + readSecurePart) || ((readSecurePart == 1) && (cb
+         .remaining() > 0))); i++) {
+       int bytesReadThisTime = readPartChunk(bytesRemaining);
+       bytesRemaining -= bytesReadThisTime;
+ 
+       Part part;
+       
+       if(i < numParts) {
+         part = this.partsList[i];
+       }
+       else {
+         part = this.securePart;
+       }
+       
+       int partLen = cb.getInt();
+       byte partType = cb.get();
+       byte[] partBytes = null;
+       if (partLen > 0) {
+         partBytes = new byte[partLen];
+         int alreadyReadBytes = cb.remaining();
+         if (alreadyReadBytes > 0) {
+           if (partLen < alreadyReadBytes) {
+             alreadyReadBytes = partLen;
+           }
+           cb.get(partBytes, 0, alreadyReadBytes);
+         }
+         // now we need to read partLen - alreadyReadBytes off the wire
+         int off = alreadyReadBytes;
+         int remaining = partLen - off;
+         while (remaining > 0) {
+           if (this.sockCh != null) {
+             int bytesThisTime = remaining;
+             cb.clear();
+             if (bytesThisTime > cb.capacity()) {
+               bytesThisTime = cb.capacity();
+             }
+             cb.limit(bytesThisTime);
+             int res = this.sockCh.read(cb);
+             if (res != -1) {
+               cb.flip();
+               bytesRemaining -= res;
+               remaining -= res;
+               cb.get(partBytes, off, res);
+               off += res;
+               if (this.msgStats != null) {
+                 this.msgStats.incReceivedBytes(res);
+               }
+             } else {
+               throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_A_PART.toLocalizedString());
+             }
+           } else {
+             int res = 0;
+             try {
+               res = this.is.read(partBytes, off, remaining);
+             }
+             catch (SocketTimeoutException e) {
+               // TODO: add cancellation check
+               throw e;
+             }
+             if (res != -1) {
+               bytesRemaining -= res;
+               remaining -= res;
+               off += res;
+               if (this.msgStats != null) {
+                 this.msgStats.incReceivedBytes(res);
+               }
+             } else {
+               throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_A_PART.toLocalizedString());
+             }
+           }
+         }
+       }
+       part.init(partBytes, partType);
+     }
+   }
+ 
+   protected int checkAndSetSecurityPart() {
+     if ((this.flags | MESSAGE_HAS_SECURE_PART) == this.flags) {
+       this.securePart = new Part();
+       return 1;
+     }
+     else {
+       this.securePart = null;
+       return 0;
+     }
+   }
+ 
+   /**
+    * @param bytesRemaining the most bytes we can read
+    * @return the number of bytes read into commBuffer
+    */
+   private int readPartChunk(int bytesRemaining) throws IOException {
+     final ByteBuffer cb = getCommBuffer();
+     if (cb.remaining() >= PART_HEADER_SIZE) {
+       // we already have the next part header in commBuffer so just return
+       return 0;
+     }
+     if (cb.position() != 0) {
+       cb.compact();
+     } else {
+       cb.position(cb.limit());
+       cb.limit(cb.capacity());
+     }
+     int bytesRead = 0;
+     if (this.sc != null) {
+       // Keep track of the fact that we are making progress
+       this.sc.updateProcessingMessage();
+     }
+     if (this.sockCh != null) {
+       int remaining = cb.remaining();
+       if (remaining > bytesRemaining) {
+         remaining = bytesRemaining;
+         cb.limit(cb.position()+bytesRemaining);
+       }
+       while (remaining > 0) {
+         int res = this.sockCh.read(cb);
+         if (res != -1) {
+           remaining -= res;
+           bytesRead += res;
+           if (this.msgStats != null) {
+             this.msgStats.incReceivedBytes(res);
+           }
+         } else {
+           throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_PAYLOAD.toLocalizedString());
+         }
+       }
+ 
+     } else {
+       int bufSpace = cb.capacity() - cb.position();
+       int bytesToRead = bufSpace;
+       if (bytesRemaining < bytesToRead) {
+         bytesToRead = bytesRemaining;
+       }
+       int pos = cb.position();
+       while (bytesToRead > 0) {
+         int res = 0;
+         try {
+           res = this.is.read(cb.array(), pos, bytesToRead);
+         }
+         catch (SocketTimeoutException e) {
+           // TODO add a cancellation check
+           throw e;
+         }
+         if (res != -1) {
+           bytesToRead -= res;
+           pos += res;
+           bytesRead += res;
+           if (this.msgStats != null) {
+             this.msgStats.incReceivedBytes(res);
+           }
+         } else {
+           throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_PAYLOAD.toLocalizedString());
+         }
+       }
+       cb.position(pos);
+     }
+     cb.flip();
+     return bytesRead;
+   }
+ 
+   /**
+    * Gets rid of all the parts that have been added to this message.
+    */
+   public void clearParts() {
+     for (int i=0; i< partsList.length; i++){
+       partsList[i].clear();
+     }
+     this.currentPart=0;
+   }
+ 
+   @Override
+   public String toString() {
+     StringBuffer sb = new StringBuffer();
+     sb.append("type=").append(MessageType.getString(msgType));
+     sb.append("; payloadLength=").append(payloadLength);
+     sb.append("; numberOfParts=").append(numberOfParts);
+     sb.append("; transactionId=").append(transactionId);
+     sb.append("; currentPart=").append(currentPart);
+     sb.append("; messageModified=").append(messageModified);
+     sb.append("; flags=").append(Integer.toHexString(flags));
+     for (int i = 0; i < numberOfParts; i ++) {
+       sb.append("; part[").append(i).append("]={");
+       sb.append(this.partsList[i].toString());
+       sb.append("}");
+     }
+     return sb.toString();
+   }
+ 
+   
+   public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
+     this.sc = sc;
+     setComms(socket, bb, msgStats);
+   }
+ 
+   public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
+     this.sockCh = socket.getChannel();
+     if (this.sockCh == null) {
+       setComms(socket, SocketUtils.getInputStream(socket), SocketUtils.getOutputStream(socket), bb, msgStats);
+     } else {
+       setComms(socket, null, null,  bb, msgStats);
+     }
+   }
+   
+   public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb, MessageStats msgStats)
+     throws IOException
+   {
+     Assert.assertTrue(socket != null);
+     this.socket = socket;
+     this.sockCh = socket.getChannel();
+     this.is = is;
+     this.os = os;
+     this.cachedCommBuffer = bb;
+     this.msgStats = msgStats;
+   }
+   /**
+    * Undo any state changes done by setComms.
+    * @since 5.7
+    */
+   public void unsetComms() {
+     this.socket = null;
+     this.sockCh = null;
+     this.is = null;
+     this.os = null;
+     this.cachedCommBuffer = null;
+     this.msgStats = null;
+   }
+ 
+   /**
+    * Sends this message to its receiver over its
+    * setOutputStream?? output stream.
+    */
+   public void send()
+   throws IOException {
+     send(true);
+   }
+   
+   public void send(ServerConnection servConn)
+   throws IOException {
+     if (this.sc != servConn) throw new IllegalStateException("this.sc was not correctly set");
+     send(true);
+   }
+   
+   /**
+    * Sends this message to its receiver over its
+    * setOutputStream?? output stream.
+    */
+   public void send(boolean clearMessage)
+   throws IOException {
+     sendBytes(clearMessage);
+   }
+ 
+   /**
+    *  Populates the stats of this <code>Message</code> with information
+    *  received via its socket
+    */
+   public void recv()
+   throws IOException {
+     if (this.socket != null) {
+       synchronized(getCommBuffer()) {
+         read();
+       }
+     }
+     else {
+       throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
+     }
+   }
+   public void recv(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter, Semaphore msgLimiter)
+   throws IOException {
+     this.sc = sc;
+     this.maxIncomingMessageLength = maxMessageLength;
+     this.dataLimiter = dataLimiter;
+     this.msgLimiter = msgLimiter;
+     recv();
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageTooLargeException.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageTooLargeException.java
index 0000000,0000000..e5cac59
new file mode 100755
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageTooLargeException.java
@@@ -1,0 -1,0 +1,29 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements.  See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License.  You may obtain a copy of the License at
++ *
++ *      http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.cache.tier.sockets;
++
++import java.io.IOException;
++
++public class MessageTooLargeException extends IOException {
++
++  private static final long serialVersionUID = -8970585803331525833L;
++  
++  public MessageTooLargeException(String message) {
++    super(message);
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
index 0000000,f5f6326..80b5c0a
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
@@@ -1,0 -1,452 +1,452 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+ 
+ import com.gemstone.gemfire.internal.*;
+ import com.gemstone.gemfire.internal.cache.CachedDeserializable;
 -import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.DataAsAddress;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+ 
+ import java.io.*;
+ import java.nio.*;
+ import java.nio.channels.*;
+ 
+ /**
+  * Represents one unit of information (essentially a <code>byte</code>
+  * array) in the wire protocol.  Each server connection runs in its
+  * own thread to maximize concurrency and improve response times to
+  * edge requests
+  *
+  * @see Message
+  *
+  * @author Sudhir Menon
+  * @since 2.0.2
+  */
+ public class Part {
+   private static final byte BYTE_CODE = 0;
+   private static final byte OBJECT_CODE = 1;
+   
+   private Version version;
+   /**
+    * Used to represent and empty byte array for bug 36279
+    * @since 5.1
+    */
+   private static final byte EMPTY_BYTEARRAY_CODE = 2;
+   private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+ 
+   /** The payload of this part.
+    * Could be null, a byte[] or a HeapDataOutputStream on the send side.
+    * Could be null, or a byte[] on the receiver side.
+    */
+   private Object part;
+ 
+   /** Is the payload (<code>part</code>) a serialized object? */
+   private byte typeCode;
+ 
+   public void init(byte[] v, byte tc) {
+     if (tc == EMPTY_BYTEARRAY_CODE) {
+       this.part = EMPTY_BYTE_ARRAY;
+     }
+     else {
+       this.part = v;
+     }
+     this.typeCode = tc;
+   }
+ 
+ 
+   public void clear() {
+     this.part = null;
+     this.typeCode = BYTE_CODE;
+   }
+ 
+   public boolean isNull() {
+     if (this.part == null) {
+       return true;
+     }
+     if (isObject() && this.part instanceof byte[]) {
+       byte[] b = (byte[])this.part;
+       if (b.length == 1 && b[0] == DSCODE.NULL) {
+         return true;
+       }
+     }
+     return false;
+   }
+   public boolean isObject() {
+     return this.typeCode == OBJECT_CODE;
+   }
+   public boolean isBytes() {
+     return this.typeCode == BYTE_CODE || this.typeCode == EMPTY_BYTEARRAY_CODE;
+   }
+ 
+   public void setPartState(byte[] b, boolean isObject) {
+     if (isObject) {
+       this.typeCode = OBJECT_CODE;
+     } else if (b != null && b.length == 0) {
+       this.typeCode = EMPTY_BYTEARRAY_CODE;
+       b = EMPTY_BYTE_ARRAY;
+     } else {
+       this.typeCode = BYTE_CODE;
+     }
+     this.part = b;
+   }
+   
+   public void setPartState(HeapDataOutputStream os, boolean isObject) {
+     if (isObject) {
+       this.typeCode = OBJECT_CODE;
+       this.part = os;
+     } else if (os != null && os.size() == 0) {
+       this.typeCode = EMPTY_BYTEARRAY_CODE;
+       this.part = EMPTY_BYTE_ARRAY;
+     } else {
+       this.typeCode = BYTE_CODE;
+       this.part = os;
+     }
+   }
+   public void setPartState(StoredObject so, boolean isObject) {
+     if (isObject) {
+       this.typeCode = OBJECT_CODE;
+     } else if (so.getValueSizeInBytes() == 0) {
+       this.typeCode = EMPTY_BYTEARRAY_CODE;
+       this.part = EMPTY_BYTE_ARRAY;
+       return;
+     } else {
+       this.typeCode = BYTE_CODE;
+     }
+     if (so instanceof DataAsAddress) {
+       this.part = ((DataAsAddress)so).getRawBytes();
+     } else {
 -      this.part = (Chunk)so;
++      this.part = (ObjectChunk)so;
+     }
+   }
+   public byte getTypeCode() {
+     return this.typeCode;
+   }
+   /**
+    * Return the length of the part. The length is the number of bytes needed
+    * for its serialized form.
+    */
+   public int getLength() {
+     if (this.part == null) {
+       return 0;
+     } else if (this.part instanceof byte[]) {
+       return ((byte[])this.part).length;
 -    } else if (this.part instanceof Chunk) {
 -      return ((Chunk) this.part).getValueSizeInBytes();
++    } else if (this.part instanceof ObjectChunk) {
++      return ((ObjectChunk) this.part).getValueSizeInBytes();
+     } else {
+       return ((HeapDataOutputStream)this.part).size();
+     }
+   }
+   public String getString() {
+     if (this.part == null) {
+       return null;
+     }
+     if (!isBytes()) {
+       Assert.assertTrue(false, "expected String part to be of type BYTE, part ="
+           + this.toString());
+     }
+     return CacheServerHelper.fromUTF((byte[])this.part);
+   }
+   
+   public int getInt() {
+     if (!isBytes()) {
+       Assert.assertTrue(false, "expected int part to be of type BYTE, part = "
+           + this.toString()); 
+     }
+     if (getLength() != 4) {
+       Assert.assertTrue(false, 
+           "expected int length to be 4 but it was " + getLength()
+           + "; part = " + this.toString());
+     }
+     byte[] bytes = getSerializedForm();
+     return decodeInt(bytes, 0);
+   }
+ 
+   public static int decodeInt(byte[] bytes, int offset) {
+     return (((bytes[offset + 0]) << 24) & 0xFF000000)
+         | (((bytes[offset + 1]) << 16) & 0x00FF0000)
+         | (((bytes[offset + 2]) << 8) & 0x0000FF00)
+         | ((bytes[offset + 3]) & 0x000000FF);
+   }
+ 
+   public void setInt(int v) {
+     byte[] bytes = new byte[4];
+     encodeInt(v, bytes);
+     this.typeCode = BYTE_CODE;
+     this.part = bytes;
+   }
+ 
+   /**
+    * @since 5.7
+    */
+   public static void encodeInt(int v, byte[] bytes) {
+     encodeInt(v, bytes, 0);
+   }
+ 
+   public static void encodeInt(int v, byte[] bytes, int offset) {
+     // encode an int into the given byte array
+     bytes[offset + 0] = (byte) ((v & 0xFF000000) >> 24);
+     bytes[offset + 1] = (byte) ((v & 0x00FF0000) >> 16);
+     bytes[offset + 2] = (byte) ((v & 0x0000FF00) >> 8 );
+     bytes[offset + 3] = (byte) (v & 0x000000FF);
+   }
+   
+   public void setLong(long v) {
+     byte[] bytes = new byte[8];
+     bytes[0] = (byte) ((v & 0xFF00000000000000l) >> 56);
+     bytes[1] = (byte) ((v & 0x00FF000000000000l) >> 48);
+     bytes[2] = (byte) ((v & 0x0000FF0000000000l) >> 40);
+     bytes[3] = (byte) ((v & 0x000000FF00000000l) >> 32);
+     bytes[4] = (byte) ((v & 0x00000000FF000000l) >> 24);
+     bytes[5] = (byte) ((v & 0x0000000000FF0000l) >> 16);
+     bytes[6] = (byte) ((v & 0x000000000000FF00l) >>  8);
+     bytes[7] = (byte) (v & 0xFF);
+     this.typeCode = BYTE_CODE;
+     this.part = bytes;
+   }
+ 
+   public long getLong() {
+     if (!isBytes()) {
+       Assert.assertTrue(false, "expected long part to be of type BYTE, part = "
+           + this.toString()); 
+     }
+     if (getLength() != 8) {
+       Assert.assertTrue(false, 
+           "expected long length to be 8 but it was " + getLength()
+           + "; part = " + this.toString());
+     }
+     byte[] bytes = getSerializedForm();
+     return ((((long)bytes[0]) << 56) & 0xFF00000000000000l) |
+            ((((long)bytes[1]) << 48) & 0x00FF000000000000l) |
+            ((((long)bytes[2]) << 40) & 0x0000FF0000000000l) |
+            ((((long)bytes[3]) << 32) & 0x000000FF00000000l) |
+            ((((long)bytes[4]) << 24) & 0x00000000FF000000l) |
+            ((((long)bytes[5]) << 16) & 0x0000000000FF0000l) |
+            ((((long)bytes[6]) <<  8) & 0x000000000000FF00l) |
+            (        bytes[7]         & 0x00000000000000FFl);
+   }
+ 
+ 
+   public byte[] getSerializedForm() {
+     if (this.part == null) {
+       return null;
+     } else if (this.part instanceof byte[]) {
+       return (byte[])this.part;
+     } else {
+       return null; // should not be called on sender side?
+     }
+   }
+   public Object getObject(boolean unzip) throws IOException, ClassNotFoundException {
+     if (isBytes()) {
+       return this.part;
+     }
+     else {
+       if (this.version != null) {
+         return CacheServerHelper.deserialize((byte[])this.part, this.version,
+             unzip);
+       }
+       else {
+         return CacheServerHelper.deserialize((byte[])this.part, unzip);
+       }
+     }
+   }
+   public Object getObject() throws IOException, ClassNotFoundException {
+     return getObject(false);
+   }
+ 
+   public Object getStringOrObject() throws IOException, ClassNotFoundException {
+     if (isObject()) {
+       return getObject();
+     } else {
+       return getString();
+     }
+   }
+   
+   /**
+    * Write the contents of this part to the specified output stream.
+    * This is only called for parts that will not fit into the commBuffer
+    * so they need to be written directly to the stream.
+    * A stream is used because the client is configured for old IO (instead of nio).
+    * @param buf the buffer to use if any data needs to be copied to one
+    */
 -  public final void sendTo(OutputStream out, ByteBuffer buf) throws IOException {
++  public final void writeTo(OutputStream out, ByteBuffer buf) throws IOException {
+     if (getLength() > 0) {
+       if (this.part instanceof byte[]) {
+         byte[] bytes = (byte[])this.part;
+         out.write(bytes, 0, bytes.length);
 -      } else if (this.part instanceof Chunk) {
 -        Chunk c = (Chunk) this.part;
++      } else if (this.part instanceof ObjectChunk) {
++        ObjectChunk c = (ObjectChunk) this.part;
+         ByteBuffer cbb = c.createDirectByteBuffer();
+         if (cbb != null) {
+           HeapDataOutputStream.writeByteBufferToStream(out,  buf, cbb);
+         } else {
+           int bytesToSend = c.getDataSize();
+           long addr = c.getAddressForReading(0, bytesToSend);
+           while (bytesToSend > 0) {
+             if (buf.remaining() == 0) {
+               HeapDataOutputStream.flushStream(out,  buf);
+             }
+             buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+             addr++;
+             bytesToSend--;
+           }
+         }
+       } else {
+         HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
+         hdos.sendTo(out, buf);
+         hdos.rewind();
+       }
+     }
+   }
+   /**
+    * Write the contents of this part to the specified byte buffer.
+    * Precondition: caller has already checked the length of this part
+    * and it will fit into "buf".
+    */
 -  public final void sendTo(ByteBuffer buf) {
++  public final void writeTo(ByteBuffer buf) {
+     if (getLength() > 0) {
+       if (this.part instanceof byte[]) {
+         buf.put((byte[])this.part);
 -      } else if (this.part instanceof Chunk) {
 -        Chunk c = (Chunk) this.part;
++      } else if (this.part instanceof ObjectChunk) {
++        ObjectChunk c = (ObjectChunk) this.part;
+         ByteBuffer bb = c.createDirectByteBuffer();
+         if (bb != null) {
+           buf.put(bb);
+         } else {
+           int bytesToSend = c.getDataSize();
+           long addr = c.getAddressForReading(0, bytesToSend);
+           while (bytesToSend > 0) {
+             buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+             addr++;
+             bytesToSend--;
+           }
+         }
+       } else {
+         HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
+         hdos.sendTo(buf);
+         hdos.rewind();
+       }
+     }
+   }
+   /**
+    * Write the contents of this part to the specified socket channel
+    * using the specified byte buffer.
+    * This is only called for parts that will not fit into the commBuffer
+    * so they need to be written directly to the socket.
+    * Precondition: buf contains nothing that needs to be sent
+    */
 -  public final void sendTo(SocketChannel sc, ByteBuffer buf) throws IOException {
++  public final void writeTo(SocketChannel sc, ByteBuffer buf) throws IOException {
+     if (getLength() > 0) {
+       final int BUF_MAX = buf.capacity();
+       if (this.part instanceof byte[]) {
+         final byte[] bytes = (byte[])this.part;
+         int off = 0;
+         int len = bytes.length;
+         buf.clear();
+         while (len > 0) {
+           int bytesThisTime = len;
+           if (bytesThisTime > BUF_MAX) {
+             bytesThisTime = BUF_MAX;
+           }
+           buf.put(bytes, off, bytesThisTime);
+           len -= bytesThisTime;
+           off += bytesThisTime;
+           buf.flip();
+           while (buf.remaining() > 0) {
+             sc.write(buf);
+           }
+           buf.clear();
+         }
 -      } else if (this.part instanceof Chunk) {
++      } else if (this.part instanceof ObjectChunk) {
+         // instead of copying the Chunk to buf try to create a direct ByteBuffer and
+         // just write it directly to the socket channel.
 -        Chunk c = (Chunk) this.part;
++        ObjectChunk c = (ObjectChunk) this.part;
+         ByteBuffer bb = c.createDirectByteBuffer();
+         if (bb != null) {
+           while (bb.remaining() > 0) {
+             sc.write(bb);
+           }
+         } else {
+           int len = c.getDataSize();
+           long addr = c.getAddressForReading(0, len);
+           buf.clear();
+           while (len > 0) {
+             int bytesThisTime = len;
+             if (bytesThisTime > BUF_MAX) {
+               bytesThisTime = BUF_MAX;
+             }
+             len -= bytesThisTime;
+             while (bytesThisTime > 0) {
+               buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+               addr++;
+               bytesThisTime--;
+             }
+             buf.flip();
+             while (buf.remaining() > 0) {
+               sc.write(buf);
+             }
+             buf.clear();
+           }
+         }
+       } else {
+         HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
+         hdos.sendTo(sc, buf);
+         hdos.rewind();
+       }
+     }
+   }
+   
+   static private String typeCodeToString(byte c) {
+     switch (c) {
+     case BYTE_CODE:
+       return "BYTE_CODE";
+     case OBJECT_CODE:
+       return "OBJECT_CODE";
+     case EMPTY_BYTEARRAY_CODE:
+       return "EMPTY_BYTEARRAY_CODE";
+     default:
+       return "unknown code " + c;
+     }
+   }
+ 
+   @Override
+   public String toString() {
+     StringBuffer sb = new StringBuffer();
+     sb.append("partCode=");
+     sb.append(typeCodeToString(this.typeCode));
+     sb.append(" partLength=" + getLength());
+ //    sb.append(" partBytes=");
+ //    byte[] b = getSerializedForm();
+ //    if (b == null) {
+ //      sb.append("null");
+ //    }
+ //    else {
+ //      sb.append("(");
+ //      for (int i = 0; i < b.length; i ++) {
+ //        sb.append(Integer.toString(b[i]));
+ //        sb.append(" ");
+ //      }
+ //      sb.append(")");
+ //    }
+     return sb.toString();
+   }
+ 
+   public void setVersion(Version clientVersion) {
+     this.version = clientVersion;
+   }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index 0000000,82e8114..1975601
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@@ -1,0 -1,209 +1,210 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+ 
+ import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
+ import com.gemstone.gemfire.cache.client.internal.TXSynchronizationOp.CompletionType;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.distributed.internal.ReplyException;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.cache.TXCommitMessage;
+ import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+ import com.gemstone.gemfire.internal.cache.TXStateProxy;
+ import com.gemstone.gemfire.internal.cache.TXSynchronizationRunnable;
+ import com.gemstone.gemfire.internal.cache.tier.Command;
+ import com.gemstone.gemfire.internal.cache.tier.MessageType;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
++import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+ 
+ import java.io.IOException;
+ import java.util.concurrent.Executor;
+ 
+ import javax.transaction.Status;
+ 
+ public class TXSynchronizationCommand extends BaseCommand {
+ 
+   private final static TXSynchronizationCommand singleton = new TXSynchronizationCommand();
+ 
+   public static Command getCommand() {
+     return singleton;
+   }
+ 
+   /* (non-Javadoc)
+    * @see com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand#shouldMasqueradeForTx(com.gemstone.gemfire.internal.cache.tier.sockets.Message, com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection)
+    */
+   @Override
+   protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) {
+     // masquerading is done in the waiting thread pool
+     return false;
+   }
+ 
+   /* (non-Javadoc)
+    * @see com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand#cmdExecute(com.gemstone.gemfire.internal.cache.tier.sockets.Message, com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection, long)
+    */
+   @Override
+   public void cmdExecute(final Message msg, final ServerConnection servConn, long start)
+       throws IOException, ClassNotFoundException, InterruptedException {
+     
+     servConn.setAsTrue(REQUIRES_RESPONSE);
+ 
+     CompletionType type = CompletionType.values()[msg.getPart(0).getInt()];
+     /*int txIdInt =*/ msg.getPart(1).getInt();  // [bruce] not sure if we need to transmit this
+     final Part statusPart;
+     if (type == CompletionType.AFTER_COMPLETION) {
+       statusPart = msg.getPart(2);
+     } else {
+       statusPart = null;
+     }
+     
+     final TXManagerImpl txMgr = (TXManagerImpl)servConn.getCache().getCacheTransactionManager();
+     final InternalDistributedMember member = (InternalDistributedMember)servConn.getProxyID().getDistributedMember();
+ 
+     // get the tx state without associating it with this thread.  That's done later
+     final TXStateProxy txProxy = txMgr.masqueradeAs(msg, member, true);
+     
+     // we have to run beforeCompletion and afterCompletion in the same thread
+     // because beforeCompletion obtains locks for the thread and afterCompletion
+     // releases them
+     if (txProxy != null) {
+       final boolean isDebugEnabled = logger.isDebugEnabled();
+       try {
+         if (type == CompletionType.BEFORE_COMPLETION) {
+           Runnable beforeCompletion = new Runnable() {
+                 @SuppressWarnings("synthetic-access")
+                 public void run() {
+                   TXStateProxy txState = null;
+                   Throwable failureException = null;
+                   try {
+                     txState = txMgr.masqueradeAs(msg, member, false);
+                     if (isDebugEnabled) {
+                       logger.debug("Executing beforeCompletion() notification for transaction {}", msg.getTransactionId());
+                     }
+                     txState.setIsJTA(true);
+                     txState.beforeCompletion();
+                     try {
+                       writeReply(msg, servConn);
+                     } catch (IOException e) {
+                       if (isDebugEnabled) {
+                         logger.debug("Problem writing reply to client", e);
+                       }
+                     }
+                     servConn.setAsTrue(RESPONDED);
+                   } catch (ReplyException e) {
+                     failureException = e.getCause();
+                   } catch (InterruptedException e) {
+                     Thread.currentThread().interrupt();
+                   } catch (Exception e) {
+                     failureException = e;
+                   } finally {
+                     txMgr.unmasquerade(txState);
+                   }
+                   if (failureException != null) {
+                     try {
+                       writeException(msg, failureException, false, servConn);
+                     } catch (IOException ioe) {
+                       if (isDebugEnabled) {
+                         logger.debug("Problem writing reply to client", ioe);
+                       }
+                     }
+                     servConn.setAsTrue(RESPONDED);
+                   }
+                 }
+               };
+           TXSynchronizationRunnable sync = new TXSynchronizationRunnable(beforeCompletion);
+           txProxy.setSynchronizationRunnable(sync);
+           Executor exec = InternalDistributedSystem.getConnectedInstance().getDistributionManager().getWaitingThreadPool();
+           exec.execute(sync);
+           sync.waitForFirstExecution();
+         } else {
+           Runnable afterCompletion = new Runnable() {
+                 @SuppressWarnings("synthetic-access")
+                 public void run() {
+                   TXStateProxy txState = null;
+                   try {
+                     txState = txMgr.masqueradeAs(msg, member, false);
+                     int status = statusPart.getInt();
+                     if (isDebugEnabled) {
+                       logger.debug("Executing afterCompletion({}) notification for transaction {}", status, msg.getTransactionId());
+                     }
+                     txState.setIsJTA(true);
+                     txState.afterCompletion(status);
+                     // GemFire commits during afterCompletion - send the commit info back to the client
+                     // where it can be applied to the local cache
+                     TXCommitMessage cmsg = txState.getCommitMessage();
+                     try {
+                       CommitCommand.writeCommitResponse(cmsg, msg, servConn);
+                       txMgr.removeHostedTXState(txState.getTxId());
+                     } catch (IOException e) {
+                       // not much can be done here
 -                      if (isDebugEnabled) {
 -                        logger.debug("Problem writing reply to client", e);
++                      if (isDebugEnabled || (e instanceof MessageTooLargeException)) {
++                        logger.warn("Problem writing reply to client", e);
+                       }
+                     }
+                     servConn.setAsTrue(RESPONDED);
+                   } catch (RuntimeException e) {
+                     try {
+                       writeException(msg, e, false, servConn);
+                     } catch (IOException ioe) {
+                       if (isDebugEnabled) {
+                         logger.debug("Problem writing reply to client", ioe);
+                       }
+                     }
+                     servConn.setAsTrue(RESPONDED);
+                   } catch (InterruptedException e) {
+                     Thread.currentThread().interrupt();
+                   } finally {
+                     txMgr.unmasquerade(txState);
+                   }
+                 }
+               };
+           // if there was a beforeCompletion call then there will be a thread
+           // sitting in the waiting pool to execute afterCompletion.  Otherwise
+           // we have failed-over and may need to do beforeCompletion & hope that it works
+           TXSynchronizationRunnable sync = txProxy.getSynchronizationRunnable();
+           if (sync != null) {
+             sync.runSecondRunnable(afterCompletion);
+           } else {
+             if (statusPart.getInt() == Status.STATUS_COMMITTED) {
+               TXStateProxy txState = txMgr.masqueradeAs(msg, member, false);
+               try {
+                 if (isDebugEnabled) {
+                   logger.debug("Executing beforeCompletion() notification for transaction {} after failover", msg.getTransactionId());
+                 }
+                 txState.setIsJTA(true);
+                 txState.beforeCompletion();
+               } finally {
+                 txMgr.unmasquerade(txState);
+               }
+             }
+             afterCompletion.run();
+           }
+         }
+       } catch (Exception e) {
+         writeException(msg, MessageType.EXCEPTION, e, false, servConn);
+         servConn.setAsTrue(RESPONDED);
+       }
+       if (isDebugEnabled) {
+         logger.debug("Sent tx synchronization response");
+       }
+     }
+   }
+ 
+ }