You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/02/22 22:44:06 UTC
[078/100] [abbrv] incubator-geode git commit: GEODE-917: Merge branch
'feature/GEODE-917' into develop
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
index 0000000,4bfd44b..a6495e2
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Message.java
@@@ -1,0 -1,1100 +1,1116 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+ import java.io.EOFException;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.io.OutputStream;
+ import java.net.Socket;
+ import java.net.SocketTimeoutException;
+ import java.nio.ByteBuffer;
+ import java.nio.channels.SocketChannel;
+ import java.util.concurrent.Semaphore;
+ import java.util.concurrent.TimeUnit;
+
+ import org.apache.logging.log4j.Logger;
+
+ import com.gemstone.gemfire.SerializationException;
+ import com.gemstone.gemfire.internal.Assert;
+ import com.gemstone.gemfire.internal.HeapDataOutputStream;
+ import com.gemstone.gemfire.internal.SocketUtils;
+ import com.gemstone.gemfire.internal.Version;
+ import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+ import com.gemstone.gemfire.internal.cache.tier.MessageType;
+ import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+ import com.gemstone.gemfire.internal.logging.LogService;
+ import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.annotations.Unretained;
+ import com.gemstone.gemfire.internal.util.BlobHelper;
+
+ /**
+ * This class encapsulates the wire protocol. It provides accessors to
+ * encode and decode a message and serialize it out to the wire.
+ *
+ * <PRE>
+ * msgType - int - 4 bytes type of message, types enumerated below
+ *
+ * msgLength - int - 4 bytes total length of variable length payload
+ *
+ * numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs)
+ * contained in the payload. Message can
+ * be a multi-part message
+ *
+ * transId - int - 4 bytes filled in by the requestor, copied back into
+ * the response
+ *
+ * flags - byte- 1 byte filled in by the requestor
+ * len1
+ * part1
+ * .
+ * .
+ * .
+ * lenn
+ * partn
+ * </PRE>
+ *
+ * We read the fixed length 16 bytes into a byte[] and populate a bytebuffer
+ * We read the fixed length header tokens from the header
+ * parse the header and use information contained in there to read the payload.
+ *
+ * <P>
+ *
+ * See also <a href="package-summary.html#messages">package description</a>.
+ *
+ * @see com.gemstone.gemfire.internal.cache.tier.MessageType
+ *
+ */
+ public class Message {
+
++ /**
++ * maximum size of an outgoing message. See GEODE-478
++ */
++ static final int MAX_MESSAGE_SIZE = Integer.getInteger("gemfire.client.max-message-size", 1073741824).intValue();
++
+ private static final Logger logger = LogService.getLogger();
+
+ private static final int PART_HEADER_SIZE = 5; // 4 bytes for length, 1 byte for isObject
+
+ private static final int FIXED_LENGTH = 17;
+
+ private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>();
+
+ protected int msgType;
+ protected int payloadLength=0;
+ protected int numberOfParts =0;
+ protected int transactionId = TXManagerImpl.NOTX;
+ protected int currentPart = 0;
+ protected Part[] partsList = null;
+ protected ByteBuffer cachedCommBuffer;
+ protected Socket socket = null;
+ protected SocketChannel sockCh = null;
+ protected OutputStream os = null;
+ protected InputStream is = null;
+ protected boolean messageModified = true;
+ /** is this message a retry of a previously sent message? */
+ protected boolean isRetry;
+ private byte flags = 0x00;
+ protected MessageStats msgStats = null;
+ protected ServerConnection sc = null;
+ private int maxIncomingMessageLength = -1;
+ private Semaphore dataLimiter = null;
+ // private int MAX_MSGS = -1;
+ private Semaphore msgLimiter = null;
+ private boolean hdrRead = false;
+ private int chunkSize = 1024;//Default Chunk Size.
+
+ protected Part securePart = null;
+ private boolean isMetaRegion = false;
+
+
+ // These two statics are fields shoved into the flags byte for transmission.
+ // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other
+ // is left in place
+ public static final byte MESSAGE_HAS_SECURE_PART = (byte)0x02;
+ public static final byte MESSAGE_IS_RETRY = (byte)0x04;
+
+ public static final byte MESSAGE_IS_RETRY_MASK = (byte)0xFB;
+
+ // Tentative workaround to avoid OOM stated in #46754.
+ public static final ThreadLocal<Integer> messageType = new ThreadLocal<Integer>();
+
+ Version version;
+
+ /**
+ * Creates a new message with the given number of parts
+ */
+ public Message(int numberOfParts, Version destVersion) {
+ this.version = destVersion;
+ Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message");
+ partsList = new Part[numberOfParts];
+ this.numberOfParts = numberOfParts;
+ for (int i=0;i<partsList.length;i++) {
+ partsList[i] = new Part();
+ }
+ }
+
+ public boolean isSecureMode() {
+ return securePart != null;
+ }
+
+ public byte[] getSecureBytes()
+ throws IOException, ClassNotFoundException {
+ return (byte[])this.securePart.getObject();
+ }
+
+ public void setMessageType(int msgType) {
+ this.messageModified = true;
+ if (!MessageType.validate(msgType)) {
+ throw new IllegalArgumentException(LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString());
+ }
+ this.msgType = msgType;
+ }
+
+ public void setVersion(Version clientVersion) {
+ this.version = clientVersion;
+ }
+
+ public void setMessageHasSecurePartFlag() {
+ this.flags = (byte)(this.flags | MESSAGE_HAS_SECURE_PART);
+ }
+
+ public void clearMessageHasSecurePartFlag() {
+ this.flags = (byte)(this.flags & MESSAGE_HAS_SECURE_PART);
+ }
+
+ /**
+ * Sets and builds the {@link Part}s that are sent
+ * in the payload of the Message
+ * @param numberOfParts
+ */
+ public void setNumberOfParts(int numberOfParts) {
+ //TODO:hitesh need to add security header here from server
+ //need to insure it is not chunked message
+ //should we look message type to avoid internal message like ping
+ this.messageModified = true;
+ this.currentPart=0;
+ this.numberOfParts = numberOfParts;
+ if (numberOfParts > this.partsList.length) {
+ Part[] newPartsList = new Part[numberOfParts];
+ for (int i=0;i<numberOfParts;i++) {
+ if (i < this.partsList.length) {
+ newPartsList[i] = this.partsList[i];
+ } else {
+ newPartsList[i] = new Part();
+ }
+ }
+ this.partsList = newPartsList;
+ }
+ }
++
++ /**
++ * For boundary testing we may need to inject mock parts
++ * @param parts
++ */
++ void setParts(Part[] parts) {
++ this.partsList = parts;
++ }
+
+ public void setTransactionId(int transactionId) {
+ this.messageModified = true;
+ this.transactionId = transactionId;
+ }
+
+ public void setIsRetry() {
+ this.isRetry = true;
+ }
+
+ /**
+ * This returns true if the message has been marked as having been previously
+ * transmitted to a different server.
+ */
+ public boolean isRetry() {
+ return this.isRetry;
+ }
+
+ /*Sets size for HDOS chunk.*/
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+
+ /**
+ * When building a Message this will return the number of the
+ * next Part to be added to the message
+ */
+ public int getNextPartNumber() {
+ return this.currentPart;
+ }
+
+ public void addStringPart(String str) {
+ if (str==null) {
+ addRawPart((byte[])null, false);
+ }
+ else {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+ this.messageModified = true;
+ Part part = partsList[this.currentPart];
+ part.setPartState(hdos, false);
+ this.currentPart++;
+ }
+ }
+
+ /*
+ * Adds a new part to this message that contains a <code>byte</code>
+ * array (as opposed to a serialized object).
+ *
+ * @see #addPart(byte[], boolean)
+ */
+ public void addBytesPart(byte[] newPart) {
+ addRawPart(newPart, false);
+ }
+
+ public void addStringOrObjPart(Object o) {
+ if (o instanceof String || o == null) {
+ addStringPart((String)o);
+ } else {
+ // Note even if o is a byte[] we need to serialize it.
+ // This could be cleaned up but it would require C client code to change.
+ serializeAndAddPart(o, false);
+ }
+ }
+
+ public void addDeltaPart(HeapDataOutputStream hdos) {
+ this.messageModified = true;
+ Part part = partsList[this.currentPart];
+ part.setPartState(hdos, false);
+ this.currentPart++;
+ }
+
+ public void addObjPart(Object o) {
+ addObjPart(o, false);
+ }
+ /**
+ * Like addObjPart(Object) but also prefers to reference
+ * objects in the part instead of copying them into a byte buffer.
+ */
+ public void addObjPartNoCopying(Object o) {
+ if (o == null || o instanceof byte[]) {
+ addRawPart((byte[])o, false);
+ } else {
+ serializeAndAddPartNoCopying(o);
+ }
+ }
+ public void addObjPart(Object o, boolean zipValues) {
+ if (o == null || o instanceof byte[]) {
+ addRawPart((byte[])o, false);
+ } else {
+ serializeAndAddPart(o, zipValues);
+ }
+ }
+ public void addPartInAnyForm(@Unretained Object o, boolean isObject) {
+ if (o == null) {
+ addRawPart((byte[])o, false);
+ } else if (o instanceof byte[]) {
+ addRawPart((byte[])o, isObject);
+ } else if (o instanceof StoredObject) {
+ // It is possible it is an off-heap StoredObject that contains a simple non-object byte[].
+ this.messageModified = true;
+ Part part = partsList[this.currentPart];
+ part.setPartState((StoredObject)o, isObject);
+ this.currentPart++;
+ } else {
+ serializeAndAddPart(o, false);
+ }
+ }
+
+ private void serializeAndAddPartNoCopying(Object o) {
+ HeapDataOutputStream hdos;
+ Version v = version;
+ if (version.equals(Version.CURRENT)){
+ v = null;
+ }
+ // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources passed to it.
+ hdos = new HeapDataOutputStream(chunkSize, v, true);
+ // TODO OFFHEAP: Change Part to look for an HDOS and just pass a reference to its DirectByteBuffer.
+ // Then change HDOS sendTo(SocketChannel...) to use the GatheringByteChannel to write a bunch of bbs.
+ // TODO OFFHEAP This code optimizes one part which works pretty good for getAll since all the values are
+ // returned in one part. But the following seems even better...
+ // BETTER: change Message to consolidate all the part hdos bb lists into a single bb array and have it do the GatheringByteChannel write.
+ // Message can use slice for the small parts (msg header and part header) that are not in the parts data (its a byte array, Chunk, or HDOS).
+ // EVEN BETTER: the message can have a single HDOS which owns a direct comm buffer. It can reserve space if it does not yet know the value to write (for example the size of the message or part).
+ // If we write something to the HDOS that is direct then it does not need to be copied.
+ // But large heap byte arrays will need to be copied to the hdos (the socket write does this anyway).
+ // If the direct buffer is full then we can allocate another one. If a part is already in a heap byte array
+ // then we could defer copying it by slicing the current direct bb and then adding the heap byte array
+ // as bb using ByteBuffer.wrap. Once we have all the data in the HDOS we can finally generate the header
+ // and then start working on sending the ByteBuffers to the channel. If we have room in a direct bb then
+ // we can copy a heap bb to it. Otherwise we can write the bb ahead of it which would free up room to copy
+ // the heap bb to the existing direct bb without needing to allocate extra direct bbs.
+ // Delaying the flush uses more direct memory but reduces the number of system calls.
+ try {
+ BlobHelper.serializeTo(o, hdos);
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ this.messageModified = true;
+ Part part = partsList[this.currentPart];
+ part.setPartState(hdos, true);
+ this.currentPart++;
+
+ }
+
+ private void serializeAndAddPart(Object o, boolean zipValues) {
+ if (zipValues) {
+ throw new UnsupportedOperationException("zipValues no longer supported");
+
+ } else {
+ HeapDataOutputStream hdos;
+ Version v = version;
+ if (version.equals(Version.CURRENT)){
+ v = null;
+ }
+ hdos = new HeapDataOutputStream(chunkSize, v);
+ try {
+ BlobHelper.serializeTo(o, hdos);
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ this.messageModified = true;
+ Part part = partsList[this.currentPart];
+ part.setPartState(hdos, true);
+ this.currentPart++;
+ }
+ }
+
+ public void addIntPart(int v) {
+ this.messageModified = true;
+ Part part = partsList[this.currentPart];
+ part.setInt(v);
+ this.currentPart++;
+ }
+
+ public void addLongPart(long v) {
+ this.messageModified = true;
+ Part part = partsList[this.currentPart];
+ part.setLong(v);
+ this.currentPart++;
+ }
+
+ /**
+ * Adds a new part to this message that may contain a serialized
+ * object.
+ */
+ public void addRawPart(byte[] newPart,boolean isObject) {
+ this.messageModified = true;
+ Part part = partsList[this.currentPart];
+ part.setPartState(newPart, isObject);
+ this.currentPart++;
+ }
+
+ public int getMessageType() {
+ return this.msgType;
+ }
+
+ public int getPayloadLength() {
+ return this.payloadLength;
+ }
+
+ public int getHeaderLength() {
+ return FIXED_LENGTH;
+ }
+
+ public int getNumberOfParts() {
+ return this.numberOfParts;
+ }
+
+ public int getTransactionId() {
+ return this.transactionId;
+ }
+
+ public Part getPart(int index) {
+ if (index < this.numberOfParts) {
+ Part p = partsList[index];
+ if (this.version != null) {
+ p.setVersion(this.version);
+ }
+ return p;
+ }
+ return null;
+ }
+
+ public static ByteBuffer setTLCommBuffer(ByteBuffer bb) {
+ ByteBuffer result = tlCommBuffer.get();
+ tlCommBuffer.set(bb);
+ return result;
+ }
+
+ public ByteBuffer getCommBuffer() {
+ if (this.cachedCommBuffer != null) {
+ return this.cachedCommBuffer;
+ }
+ else {
+ return tlCommBuffer.get();
+ }
+ }
+
+ public void clear() {
+ this.isRetry = false;
+ int len = this.payloadLength;
+ if (len != 0) {
+ this.payloadLength = 0;
+ }
+ if (this.hdrRead) {
+ if (this.msgStats != null) {
+ this.msgStats.decMessagesBeingReceived(len);
+ }
+ }
+ ByteBuffer buffer = getCommBuffer();
+ if (buffer != null) {
+ buffer.clear();
+ }
+ clearParts();
+ if (len != 0 && this.dataLimiter != null) {
+ this.dataLimiter.release(len);
+ this.dataLimiter = null;
+ this.maxIncomingMessageLength = 0;
+ }
+ if (this.hdrRead) {
+ if (this.msgLimiter != null) {
+ this.msgLimiter.release(1);
+ this.msgLimiter = null;
+ }
+ this.hdrRead = false;
+ }
+ this.flags = 0;
+ }
+
+ protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) {
+ //TODO:hitesh setting second bit of flags byte for client
+ //this is not require but this makes all changes easily at client side right now
+ //just see this bit and process security header
+ byte flagsByte = this.flags;
+ if (isSecurityHeader) {
+ flagsByte |= MESSAGE_HAS_SECURE_PART;
+ }
+ if (this.isRetry) {
+ flagsByte |= MESSAGE_IS_RETRY;
+ }
+ getCommBuffer()
+ .putInt(this.msgType)
+ .putInt(msgLen)
+ .putInt(this.numberOfParts)
+ .putInt(this.transactionId)
+ .put(flagsByte);
+ }
+
+ protected Part getSecurityPart() {
+ if (this.sc != null ) {
+ //look types right put get etc
+ return this.sc.updateAndGetSecurityPart();
+ }
+ return null;
+ }
+
+ public void setSecurePart(byte[] bytes) {
+ this.securePart = new Part();
+ this.securePart.setPartState(bytes, false);
+ }
+
+ public void setMetaRegion(boolean isMetaRegion) {
+ this.isMetaRegion = isMetaRegion;
+ }
+
+ public boolean getAndResetIsMetaRegion() {
+ boolean isMetaRegion = this.isMetaRegion;
+ this.isMetaRegion = false;
+ return isMetaRegion;
+ }
+
+ /**
+ * Sends this message out on its socket.
+ */
+ protected void sendBytes(boolean clearMessage) throws IOException {
+ if (this.sc != null) {
+ // Keep track of the fact that we are making progress.
+ this.sc.updateProcessingMessage();
+ }
+ if (this.socket != null) {
+ final ByteBuffer cb = getCommBuffer();
+ if (cb == null) {
+ throw new IOException("No buffer");
+ }
++ int msgLen = 0;
+ synchronized(cb) {
- int numOfSecureParts = 0;
- Part securityPart = this.getSecurityPart();
- boolean isSecurityHeader = false;
++ long totalPartLen = 0;
++ long headerLen = 0;
++ int partsToTransmit = this.numberOfParts;
+
- if (securityPart != null) {
- isSecurityHeader = true;
- numOfSecureParts = 1;
- }
- else if (this.securePart != null) {
- // This is a client sending this message.
- securityPart = this.securePart;
- isSecurityHeader = true;
- numOfSecureParts = 1;
- }
-
- int totalPartLen = 0;
- for (int i=0;i<this.numberOfParts;i++){
++ for (int i=0; i < this.numberOfParts; i++) {
+ Part part = this.partsList[i];
++ headerLen += PART_HEADER_SIZE;
+ totalPartLen += part.getLength();
+ }
+
- if(numOfSecureParts == 1) {
++ Part securityPart = this.getSecurityPart();
++ if (securityPart == null) {
++ securityPart = this.securePart;
++ }
++ if (securityPart != null) {
++ headerLen += PART_HEADER_SIZE;
+ totalPartLen += securityPart.getLength();
++ partsToTransmit++;
+ }
- int msgLen = (PART_HEADER_SIZE * (this.numberOfParts + numOfSecureParts)) + totalPartLen;
++
++ if ( (headerLen + totalPartLen) > Integer.MAX_VALUE ) {
++ throw new MessageTooLargeException("Message size (" + (headerLen + totalPartLen)
++ + ") exceeds maximum integer value");
++ }
++
++ msgLen = (int)(headerLen + totalPartLen);
++
++ if (msgLen > MAX_MESSAGE_SIZE) {
++ throw new MessageTooLargeException("Message size(" + msgLen
++ + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")");
++ }
++
+ cb.clear();
- packHeaderInfoForSending(msgLen, isSecurityHeader);
- for (int i=0;i<this.numberOfParts + numOfSecureParts;i++) {
- Part part = null;
- if(i == this.numberOfParts) {
- part = securityPart;
- }
- else {
- part = partsList[i];
- }
++ packHeaderInfoForSending(msgLen, (securityPart != null));
++ for (int i=0; i < partsToTransmit; i++) {
++ Part part = (i == this.numberOfParts) ? securityPart : partsList[i];
++
+ if (cb.remaining() < PART_HEADER_SIZE) {
+ flushBuffer();
+ }
++
+ int partLen = part.getLength();
+ cb.putInt(partLen);
+ cb.put(part.getTypeCode());
+ if (partLen <= cb.remaining()) {
- part.sendTo(cb);
++ part.writeTo(cb);
+ } else {
+ flushBuffer();
- // send partBytes
+ if (this.sockCh != null) {
- part.sendTo(this.sockCh, cb);
++ part.writeTo(this.sockCh, cb);
+ } else {
- part.sendTo(this.os, cb);
++ part.writeTo(this.os, cb);
+ }
+ if (this.msgStats != null) {
+ this.msgStats.incSentBytes(partLen);
+ }
+ }
+ }
+ if (cb.position() != 0) {
+ flushBuffer();
+ }
+ this.messageModified = false;
+ if (this.sockCh == null) {
+ this.os.flush();
+ }
+ }
+ if(clearMessage) {
+ clearParts();
+ }
+ }
+ else {
+ throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
+ }
+ }
+
+ protected void flushBuffer() throws IOException {
+ final ByteBuffer cb = getCommBuffer();
+ if (this.sockCh != null) {
+ cb.flip();
+ do {
+ this.sockCh.write(cb);
+ } while (cb.remaining() > 0);
+ } else {
+ this.os.write(cb.array(), 0, cb.position());
+ }
+ if (this.msgStats != null) {
+ this.msgStats.incSentBytes(cb.position());
+ }
+ cb.clear();
+ }
+
+ private void read()
+ throws IOException {
+ clearParts();
+ //TODO:Hitesh ??? for server changes make sure sc is not null as this class also used by client :(
+ readHeaderAndPayload();
+ }
+
+ /**
+ * Read the actual bytes of the header off the socket
+ */
+ protected final void fetchHeader() throws IOException {
+ final ByteBuffer cb = getCommBuffer();
+ cb.clear();
+ // msgType is invalidated here and can be used as an indicator
+ // of problems reading the message
+ this.msgType = MessageType.INVALID;
+
+ int hdr = 0;
+
+ final int headerLength = getHeaderLength();
+ if (this.sockCh != null) {
+ cb.limit(headerLength);
+ do {
+ int bytesRead = this.sockCh.read(cb);
+ //System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes commBuffer=" + cb);
+ if (bytesRead == -1) {
+ throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER.toLocalizedString());
+ }
+ if (this.msgStats != null) {
+ this.msgStats.incReceivedBytes(bytesRead);
+ }
+ } while (cb.remaining() > 0);
+ cb.flip();
+ } else {
+ do {
+ int bytesRead = -1;
+ try {
+ bytesRead = this.is.read(cb.array(),hdr, headerLength-hdr);
+ }
+ catch (SocketTimeoutException e) {
+ // bytesRead = 0;
+ // TODO add a cancellation check
+ throw e;
+ }
+ if (bytesRead == -1) {
+ throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER.toLocalizedString());
+ }
+ hdr += bytesRead;
+ if (this.msgStats != null) {
+ this.msgStats.incReceivedBytes(bytesRead);
+ }
+ } while (hdr < headerLength);
+
+ // now setup the commBuffer for the caller to parse it
+ cb.rewind();
+ }
+ }
+
+ private void readHeaderAndPayload()
+ throws IOException {
+ //TODO:Hitesh ???
+ fetchHeader();
+ final ByteBuffer cb = getCommBuffer();
+ final int type = cb.getInt();
+ final int len = cb.getInt();
+ final int numParts = cb.getInt();
+ final int txid = cb.getInt();
+ byte bits = cb.get();
+ cb.clear();
+
+ if (!MessageType.validate(type)) {
+ throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER.toLocalizedString(Integer.valueOf(type)));
+ }
+ int timeToWait = 0;
+ if (this.sc != null) {
+ // Keep track of the fact that a message is being processed.
+ this.sc.setProcessingMessage();
+ timeToWait = sc.getClientReadTimeout();
+ }
+ this.hdrRead = true;
+ if (this.msgLimiter != null) {
+ for (;;) {
+ this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+ boolean interrupted = Thread.interrupted();
+ try {
+ if (timeToWait == 0) {
+ this.msgLimiter.acquire(1);
+ }
+ else {
+ if (!this.msgLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) {
+ if (this.msgStats != null
+ && this.msgStats instanceof CacheServerStats) {
+ ((CacheServerStats)this.msgStats).incConnectionsTimedOut();
+ }
+ throw new IOException(LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS.toLocalizedString(Integer.valueOf(timeToWait)));
+ }
+ }
+ break;
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // for
+ }
+ if (len > 0) {
+ if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) {
+ throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1.toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(this.maxIncomingMessageLength)}));
+ }
+ if (this.dataLimiter != null) {
+ for (;;) {
+ if (sc != null) {
+ this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+ }
+ boolean interrupted = Thread.interrupted();
+ try {
+ if (timeToWait == 0) {
+ this.dataLimiter.acquire(len);
+ }
+ else {
+ int newTimeToWait = timeToWait;
+ if (this.msgLimiter != null) {
+ // may have waited for msg limit so recalc time to wait
+ newTimeToWait -= (int)sc.getCurrentMessageProcessingTime();
+ }
+ if (newTimeToWait <= 0 || !this.msgLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) {
+ throw new IOException(LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS.toLocalizedString(timeToWait));
+ }
+ }
+ this.payloadLength = len; // makes sure payloadLength gets set now so we will release the semaphore
+ break; // success
+ }
+ catch (InterruptedException e) {
+ interrupted = true;
+ }
+ finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+ }
+ if (this.msgStats != null) {
+ this.msgStats.incMessagesBeingReceived(len);
+ this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear
+ }
+
+ this.isRetry = (bits & MESSAGE_IS_RETRY) != 0;
+ bits = (byte)(bits & MESSAGE_IS_RETRY_MASK);
+ this.flags = bits;
+ // TODO why is the msgType set twice, here and after reading the payload fields?
+ this.msgType = type;
+
+ readPayloadFields(numParts, len);
+
+ // Set the header and payload fields only after receiving all the
+ // socket data, providing better message consistency in the face
+ // of exceptional conditions (e.g. IO problems, timeouts etc.)
+ this.msgType = type;
+ this.payloadLength = len;
+ // this.numberOfParts = numParts; Already set in setPayloadFields via setNumberOfParts
+ this.transactionId = txid;
+ this.flags = bits;
+ if (this.sc != null) {
+ // Keep track of the fact that a message is being processed.
+ this.sc.updateProcessingMessage();
+ }
+ }
+
+ protected void readPayloadFields(final int numParts, final int len)
+ throws IOException {
+ //TODO:Hitesh
+ if (len > 0 && numParts <= 0 ||
+ len <= 0 && numParts > 0) {
+ throw new IOException(LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT.toLocalizedString(
+ new Object[] {Integer.valueOf(len), Integer.valueOf(numParts)}));
+ }
+
+ Integer msgType = messageType.get();
+ if (msgType != null && msgType == MessageType.PING) {
+ messageType.set(null); // set it to null right away.
+ int pingParts = 10; // Some number which will not throw OOM but still be acceptable for a ping operation.
+ if (numParts > pingParts) {
+ throw new IOException("Part length ( " + numParts
+ + " ) is inconsistent for " + MessageType.getString(msgType)
+ + " operation.");
+ }
+ }
+ setNumberOfParts(numParts);
+ if (numParts <= 0)
+ return;
+
+ if (len < 0) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, len));
+ throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
+ }
+
+ final ByteBuffer cb = getCommBuffer();
+ cb.clear();
+ cb.flip();
+
+ int readSecurePart = 0;
+ //TODO:Hitesh look if securePart can be cached here
+ readSecurePart = checkAndSetSecurityPart();
+
+ int bytesRemaining = len;
+ for (int i = 0; ((i < numParts + readSecurePart) || ((readSecurePart == 1) && (cb
+ .remaining() > 0))); i++) {
+ int bytesReadThisTime = readPartChunk(bytesRemaining);
+ bytesRemaining -= bytesReadThisTime;
+
+ Part part;
+
+ if(i < numParts) {
+ part = this.partsList[i];
+ }
+ else {
+ part = this.securePart;
+ }
+
+ int partLen = cb.getInt();
+ byte partType = cb.get();
+ byte[] partBytes = null;
+ if (partLen > 0) {
+ partBytes = new byte[partLen];
+ int alreadyReadBytes = cb.remaining();
+ if (alreadyReadBytes > 0) {
+ if (partLen < alreadyReadBytes) {
+ alreadyReadBytes = partLen;
+ }
+ cb.get(partBytes, 0, alreadyReadBytes);
+ }
+ // now we need to read partLen - alreadyReadBytes off the wire
+ int off = alreadyReadBytes;
+ int remaining = partLen - off;
+ while (remaining > 0) {
+ if (this.sockCh != null) {
+ int bytesThisTime = remaining;
+ cb.clear();
+ if (bytesThisTime > cb.capacity()) {
+ bytesThisTime = cb.capacity();
+ }
+ cb.limit(bytesThisTime);
+ int res = this.sockCh.read(cb);
+ if (res != -1) {
+ cb.flip();
+ bytesRemaining -= res;
+ remaining -= res;
+ cb.get(partBytes, off, res);
+ off += res;
+ if (this.msgStats != null) {
+ this.msgStats.incReceivedBytes(res);
+ }
+ } else {
+ throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_A_PART.toLocalizedString());
+ }
+ } else {
+ int res = 0;
+ try {
+ res = this.is.read(partBytes, off, remaining);
+ }
+ catch (SocketTimeoutException e) {
+ // TODO: add cancellation check
+ throw e;
+ }
+ if (res != -1) {
+ bytesRemaining -= res;
+ remaining -= res;
+ off += res;
+ if (this.msgStats != null) {
+ this.msgStats.incReceivedBytes(res);
+ }
+ } else {
+ throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_A_PART.toLocalizedString());
+ }
+ }
+ }
+ }
+ part.init(partBytes, partType);
+ }
+ }
+
+ protected int checkAndSetSecurityPart() {
+ if ((this.flags | MESSAGE_HAS_SECURE_PART) == this.flags) {
+ this.securePart = new Part();
+ return 1;
+ }
+ else {
+ this.securePart = null;
+ return 0;
+ }
+ }
+
+ /**
+ * @param bytesRemaining the most bytes we can read
+ * @return the number of bytes read into commBuffer
+ */
+ private int readPartChunk(int bytesRemaining) throws IOException {
+ final ByteBuffer cb = getCommBuffer();
+ if (cb.remaining() >= PART_HEADER_SIZE) {
+ // we already have the next part header in commBuffer so just return
+ return 0;
+ }
+ if (cb.position() != 0) {
+ cb.compact();
+ } else {
+ cb.position(cb.limit());
+ cb.limit(cb.capacity());
+ }
+ int bytesRead = 0;
+ if (this.sc != null) {
+ // Keep track of the fact that we are making progress
+ this.sc.updateProcessingMessage();
+ }
+ if (this.sockCh != null) {
+ int remaining = cb.remaining();
+ if (remaining > bytesRemaining) {
+ remaining = bytesRemaining;
+ cb.limit(cb.position()+bytesRemaining);
+ }
+ while (remaining > 0) {
+ int res = this.sockCh.read(cb);
+ if (res != -1) {
+ remaining -= res;
+ bytesRead += res;
+ if (this.msgStats != null) {
+ this.msgStats.incReceivedBytes(res);
+ }
+ } else {
+ throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_PAYLOAD.toLocalizedString());
+ }
+ }
+
+ } else {
+ int bufSpace = cb.capacity() - cb.position();
+ int bytesToRead = bufSpace;
+ if (bytesRemaining < bytesToRead) {
+ bytesToRead = bytesRemaining;
+ }
+ int pos = cb.position();
+ while (bytesToRead > 0) {
+ int res = 0;
+ try {
+ res = this.is.read(cb.array(), pos, bytesToRead);
+ }
+ catch (SocketTimeoutException e) {
+ // TODO add a cancellation check
+ throw e;
+ }
+ if (res != -1) {
+ bytesToRead -= res;
+ pos += res;
+ bytesRead += res;
+ if (this.msgStats != null) {
+ this.msgStats.incReceivedBytes(res);
+ }
+ } else {
+ throw new EOFException(LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_PAYLOAD.toLocalizedString());
+ }
+ }
+ cb.position(pos);
+ }
+ cb.flip();
+ return bytesRead;
+ }
+
+ /**
+ * Gets rid of all the parts that have been added to this message.
+ */
+ public void clearParts() {
+ for (int i=0; i< partsList.length; i++){
+ partsList[i].clear();
+ }
+ this.currentPart=0;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("type=").append(MessageType.getString(msgType));
+ sb.append("; payloadLength=").append(payloadLength);
+ sb.append("; numberOfParts=").append(numberOfParts);
+ sb.append("; transactionId=").append(transactionId);
+ sb.append("; currentPart=").append(currentPart);
+ sb.append("; messageModified=").append(messageModified);
+ sb.append("; flags=").append(Integer.toHexString(flags));
+ for (int i = 0; i < numberOfParts; i ++) {
+ sb.append("; part[").append(i).append("]={");
+ sb.append(this.partsList[i].toString());
+ sb.append("}");
+ }
+ return sb.toString();
+ }
+
+
+ public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
+ this.sc = sc;
+ setComms(socket, bb, msgStats);
+ }
+
+ public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
+ this.sockCh = socket.getChannel();
+ if (this.sockCh == null) {
+ setComms(socket, SocketUtils.getInputStream(socket), SocketUtils.getOutputStream(socket), bb, msgStats);
+ } else {
+ setComms(socket, null, null, bb, msgStats);
+ }
+ }
+
+ public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb, MessageStats msgStats)
+ throws IOException
+ {
+ Assert.assertTrue(socket != null);
+ this.socket = socket;
+ this.sockCh = socket.getChannel();
+ this.is = is;
+ this.os = os;
+ this.cachedCommBuffer = bb;
+ this.msgStats = msgStats;
+ }
+ /**
+ * Undo any state changes done by setComms.
+ * @since 5.7
+ */
+ public void unsetComms() {
+ this.socket = null;
+ this.sockCh = null;
+ this.is = null;
+ this.os = null;
+ this.cachedCommBuffer = null;
+ this.msgStats = null;
+ }
+
+ /**
+ * Sends this message to its receiver over its
+ * setOutputStream?? output stream.
+ */
+ public void send()
+ throws IOException {
+ send(true);
+ }
+
+ public void send(ServerConnection servConn)
+ throws IOException {
+ if (this.sc != servConn) throw new IllegalStateException("this.sc was not correctly set");
+ send(true);
+ }
+
+ /**
+ * Sends this message to its receiver over its
+ * setOutputStream?? output stream.
+ */
+ public void send(boolean clearMessage)
+ throws IOException {
+ sendBytes(clearMessage);
+ }
+
+ /**
+ * Populates the stats of this <code>Message</code> with information
+ * received via its socket
+ */
+ public void recv()
+ throws IOException {
+ if (this.socket != null) {
+ synchronized(getCommBuffer()) {
+ read();
+ }
+ }
+ else {
+ throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
+ }
+ }
+ public void recv(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter, Semaphore msgLimiter)
+ throws IOException {
+ this.sc = sc;
+ this.maxIncomingMessageLength = maxMessageLength;
+ this.dataLimiter = dataLimiter;
+ this.msgLimiter = msgLimiter;
+ recv();
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageTooLargeException.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageTooLargeException.java
index 0000000,0000000..e5cac59
new file mode 100755
--- /dev/null
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/MessageTooLargeException.java
@@@ -1,0 -1,0 +1,29 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package com.gemstone.gemfire.internal.cache.tier.sockets;
++
++import java.io.IOException;
++
++public class MessageTooLargeException extends IOException {
++
++ private static final long serialVersionUID = -8970585803331525833L;
++
++ public MessageTooLargeException(String message) {
++ super(message);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
index 0000000,f5f6326..80b5c0a
mode 000000,100755..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/Part.java
@@@ -1,0 -1,452 +1,452 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+ import com.gemstone.gemfire.internal.*;
+ import com.gemstone.gemfire.internal.cache.CachedDeserializable;
-import com.gemstone.gemfire.internal.offheap.Chunk;
++import com.gemstone.gemfire.internal.offheap.ObjectChunk;
+ import com.gemstone.gemfire.internal.offheap.DataAsAddress;
+ import com.gemstone.gemfire.internal.offheap.StoredObject;
+ import com.gemstone.gemfire.internal.offheap.UnsafeMemoryChunk;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.nio.channels.*;
+
+ /**
+ * Represents one unit of information (essentially a <code>byte</code>
+ * array) in the wire protocol. Each server connection runs in its
+ * own thread to maximize concurrency and improve response times to
+ * edge requests
+ *
+ * @see Message
+ *
+ * @author Sudhir Menon
+ * @since 2.0.2
+ */
+ public class Part {
+ private static final byte BYTE_CODE = 0;
+ private static final byte OBJECT_CODE = 1;
+
+ private Version version;
+ /**
+ * Used to represent and empty byte array for bug 36279
+ * @since 5.1
+ */
+ private static final byte EMPTY_BYTEARRAY_CODE = 2;
+ private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+ /** The payload of this part.
+ * Could be null, a byte[] or a HeapDataOutputStream on the send side.
+ * Could be null, or a byte[] on the receiver side.
+ */
+ private Object part;
+
+ /** Is the payload (<code>part</code>) a serialized object? */
+ private byte typeCode;
+
+ public void init(byte[] v, byte tc) {
+ if (tc == EMPTY_BYTEARRAY_CODE) {
+ this.part = EMPTY_BYTE_ARRAY;
+ }
+ else {
+ this.part = v;
+ }
+ this.typeCode = tc;
+ }
+
+
+ public void clear() {
+ this.part = null;
+ this.typeCode = BYTE_CODE;
+ }
+
+ public boolean isNull() {
+ if (this.part == null) {
+ return true;
+ }
+ if (isObject() && this.part instanceof byte[]) {
+ byte[] b = (byte[])this.part;
+ if (b.length == 1 && b[0] == DSCODE.NULL) {
+ return true;
+ }
+ }
+ return false;
+ }
+ public boolean isObject() {
+ return this.typeCode == OBJECT_CODE;
+ }
+ public boolean isBytes() {
+ return this.typeCode == BYTE_CODE || this.typeCode == EMPTY_BYTEARRAY_CODE;
+ }
+
+ public void setPartState(byte[] b, boolean isObject) {
+ if (isObject) {
+ this.typeCode = OBJECT_CODE;
+ } else if (b != null && b.length == 0) {
+ this.typeCode = EMPTY_BYTEARRAY_CODE;
+ b = EMPTY_BYTE_ARRAY;
+ } else {
+ this.typeCode = BYTE_CODE;
+ }
+ this.part = b;
+ }
+
+ public void setPartState(HeapDataOutputStream os, boolean isObject) {
+ if (isObject) {
+ this.typeCode = OBJECT_CODE;
+ this.part = os;
+ } else if (os != null && os.size() == 0) {
+ this.typeCode = EMPTY_BYTEARRAY_CODE;
+ this.part = EMPTY_BYTE_ARRAY;
+ } else {
+ this.typeCode = BYTE_CODE;
+ this.part = os;
+ }
+ }
+ public void setPartState(StoredObject so, boolean isObject) {
+ if (isObject) {
+ this.typeCode = OBJECT_CODE;
+ } else if (so.getValueSizeInBytes() == 0) {
+ this.typeCode = EMPTY_BYTEARRAY_CODE;
+ this.part = EMPTY_BYTE_ARRAY;
+ return;
+ } else {
+ this.typeCode = BYTE_CODE;
+ }
+ if (so instanceof DataAsAddress) {
+ this.part = ((DataAsAddress)so).getRawBytes();
+ } else {
- this.part = (Chunk)so;
++ this.part = (ObjectChunk)so;
+ }
+ }
+ public byte getTypeCode() {
+ return this.typeCode;
+ }
+ /**
+ * Return the length of the part. The length is the number of bytes needed
+ * for its serialized form.
+ */
+ public int getLength() {
+ if (this.part == null) {
+ return 0;
+ } else if (this.part instanceof byte[]) {
+ return ((byte[])this.part).length;
- } else if (this.part instanceof Chunk) {
- return ((Chunk) this.part).getValueSizeInBytes();
++ } else if (this.part instanceof ObjectChunk) {
++ return ((ObjectChunk) this.part).getValueSizeInBytes();
+ } else {
+ return ((HeapDataOutputStream)this.part).size();
+ }
+ }
+ public String getString() {
+ if (this.part == null) {
+ return null;
+ }
+ if (!isBytes()) {
+ Assert.assertTrue(false, "expected String part to be of type BYTE, part ="
+ + this.toString());
+ }
+ return CacheServerHelper.fromUTF((byte[])this.part);
+ }
+
+ public int getInt() {
+ if (!isBytes()) {
+ Assert.assertTrue(false, "expected int part to be of type BYTE, part = "
+ + this.toString());
+ }
+ if (getLength() != 4) {
+ Assert.assertTrue(false,
+ "expected int length to be 4 but it was " + getLength()
+ + "; part = " + this.toString());
+ }
+ byte[] bytes = getSerializedForm();
+ return decodeInt(bytes, 0);
+ }
+
+ public static int decodeInt(byte[] bytes, int offset) {
+ return (((bytes[offset + 0]) << 24) & 0xFF000000)
+ | (((bytes[offset + 1]) << 16) & 0x00FF0000)
+ | (((bytes[offset + 2]) << 8) & 0x0000FF00)
+ | ((bytes[offset + 3]) & 0x000000FF);
+ }
+
+ public void setInt(int v) {
+ byte[] bytes = new byte[4];
+ encodeInt(v, bytes);
+ this.typeCode = BYTE_CODE;
+ this.part = bytes;
+ }
+
+ /**
+ * @since 5.7
+ */
+ public static void encodeInt(int v, byte[] bytes) {
+ encodeInt(v, bytes, 0);
+ }
+
+ public static void encodeInt(int v, byte[] bytes, int offset) {
+ // encode an int into the given byte array
+ bytes[offset + 0] = (byte) ((v & 0xFF000000) >> 24);
+ bytes[offset + 1] = (byte) ((v & 0x00FF0000) >> 16);
+ bytes[offset + 2] = (byte) ((v & 0x0000FF00) >> 8 );
+ bytes[offset + 3] = (byte) (v & 0x000000FF);
+ }
+
+ public void setLong(long v) {
+ byte[] bytes = new byte[8];
+ bytes[0] = (byte) ((v & 0xFF00000000000000l) >> 56);
+ bytes[1] = (byte) ((v & 0x00FF000000000000l) >> 48);
+ bytes[2] = (byte) ((v & 0x0000FF0000000000l) >> 40);
+ bytes[3] = (byte) ((v & 0x000000FF00000000l) >> 32);
+ bytes[4] = (byte) ((v & 0x00000000FF000000l) >> 24);
+ bytes[5] = (byte) ((v & 0x0000000000FF0000l) >> 16);
+ bytes[6] = (byte) ((v & 0x000000000000FF00l) >> 8);
+ bytes[7] = (byte) (v & 0xFF);
+ this.typeCode = BYTE_CODE;
+ this.part = bytes;
+ }
+
+ public long getLong() {
+ if (!isBytes()) {
+ Assert.assertTrue(false, "expected long part to be of type BYTE, part = "
+ + this.toString());
+ }
+ if (getLength() != 8) {
+ Assert.assertTrue(false,
+ "expected long length to be 8 but it was " + getLength()
+ + "; part = " + this.toString());
+ }
+ byte[] bytes = getSerializedForm();
+ return ((((long)bytes[0]) << 56) & 0xFF00000000000000l) |
+ ((((long)bytes[1]) << 48) & 0x00FF000000000000l) |
+ ((((long)bytes[2]) << 40) & 0x0000FF0000000000l) |
+ ((((long)bytes[3]) << 32) & 0x000000FF00000000l) |
+ ((((long)bytes[4]) << 24) & 0x00000000FF000000l) |
+ ((((long)bytes[5]) << 16) & 0x0000000000FF0000l) |
+ ((((long)bytes[6]) << 8) & 0x000000000000FF00l) |
+ ( bytes[7] & 0x00000000000000FFl);
+ }
+
+
+ public byte[] getSerializedForm() {
+ if (this.part == null) {
+ return null;
+ } else if (this.part instanceof byte[]) {
+ return (byte[])this.part;
+ } else {
+ return null; // should not be called on sender side?
+ }
+ }
+ public Object getObject(boolean unzip) throws IOException, ClassNotFoundException {
+ if (isBytes()) {
+ return this.part;
+ }
+ else {
+ if (this.version != null) {
+ return CacheServerHelper.deserialize((byte[])this.part, this.version,
+ unzip);
+ }
+ else {
+ return CacheServerHelper.deserialize((byte[])this.part, unzip);
+ }
+ }
+ }
+ public Object getObject() throws IOException, ClassNotFoundException {
+ return getObject(false);
+ }
+
+ public Object getStringOrObject() throws IOException, ClassNotFoundException {
+ if (isObject()) {
+ return getObject();
+ } else {
+ return getString();
+ }
+ }
+
+ /**
+ * Write the contents of this part to the specified output stream.
+ * This is only called for parts that will not fit into the commBuffer
+ * so they need to be written directly to the stream.
+ * A stream is used because the client is configured for old IO (instead of nio).
+ * @param buf the buffer to use if any data needs to be copied to one
+ */
- public final void sendTo(OutputStream out, ByteBuffer buf) throws IOException {
++ public final void writeTo(OutputStream out, ByteBuffer buf) throws IOException {
+ if (getLength() > 0) {
+ if (this.part instanceof byte[]) {
+ byte[] bytes = (byte[])this.part;
+ out.write(bytes, 0, bytes.length);
- } else if (this.part instanceof Chunk) {
- Chunk c = (Chunk) this.part;
++ } else if (this.part instanceof ObjectChunk) {
++ ObjectChunk c = (ObjectChunk) this.part;
+ ByteBuffer cbb = c.createDirectByteBuffer();
+ if (cbb != null) {
+ HeapDataOutputStream.writeByteBufferToStream(out, buf, cbb);
+ } else {
+ int bytesToSend = c.getDataSize();
+ long addr = c.getAddressForReading(0, bytesToSend);
+ while (bytesToSend > 0) {
+ if (buf.remaining() == 0) {
+ HeapDataOutputStream.flushStream(out, buf);
+ }
+ buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+ addr++;
+ bytesToSend--;
+ }
+ }
+ } else {
+ HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
+ hdos.sendTo(out, buf);
+ hdos.rewind();
+ }
+ }
+ }
+ /**
+ * Write the contents of this part to the specified byte buffer.
+ * Precondition: caller has already checked the length of this part
+ * and it will fit into "buf".
+ */
- public final void sendTo(ByteBuffer buf) {
++ public final void writeTo(ByteBuffer buf) {
+ if (getLength() > 0) {
+ if (this.part instanceof byte[]) {
+ buf.put((byte[])this.part);
- } else if (this.part instanceof Chunk) {
- Chunk c = (Chunk) this.part;
++ } else if (this.part instanceof ObjectChunk) {
++ ObjectChunk c = (ObjectChunk) this.part;
+ ByteBuffer bb = c.createDirectByteBuffer();
+ if (bb != null) {
+ buf.put(bb);
+ } else {
+ int bytesToSend = c.getDataSize();
+ long addr = c.getAddressForReading(0, bytesToSend);
+ while (bytesToSend > 0) {
+ buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+ addr++;
+ bytesToSend--;
+ }
+ }
+ } else {
+ HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
+ hdos.sendTo(buf);
+ hdos.rewind();
+ }
+ }
+ }
+ /**
+ * Write the contents of this part to the specified socket channel
+ * using the specified byte buffer.
+ * This is only called for parts that will not fit into the commBuffer
+ * so they need to be written directly to the socket.
+ * Precondition: buf contains nothing that needs to be sent
+ */
- public final void sendTo(SocketChannel sc, ByteBuffer buf) throws IOException {
++ public final void writeTo(SocketChannel sc, ByteBuffer buf) throws IOException {
+ if (getLength() > 0) {
+ final int BUF_MAX = buf.capacity();
+ if (this.part instanceof byte[]) {
+ final byte[] bytes = (byte[])this.part;
+ int off = 0;
+ int len = bytes.length;
+ buf.clear();
+ while (len > 0) {
+ int bytesThisTime = len;
+ if (bytesThisTime > BUF_MAX) {
+ bytesThisTime = BUF_MAX;
+ }
+ buf.put(bytes, off, bytesThisTime);
+ len -= bytesThisTime;
+ off += bytesThisTime;
+ buf.flip();
+ while (buf.remaining() > 0) {
+ sc.write(buf);
+ }
+ buf.clear();
+ }
- } else if (this.part instanceof Chunk) {
++ } else if (this.part instanceof ObjectChunk) {
+ // instead of copying the Chunk to buf try to create a direct ByteBuffer and
+ // just write it directly to the socket channel.
- Chunk c = (Chunk) this.part;
++ ObjectChunk c = (ObjectChunk) this.part;
+ ByteBuffer bb = c.createDirectByteBuffer();
+ if (bb != null) {
+ while (bb.remaining() > 0) {
+ sc.write(bb);
+ }
+ } else {
+ int len = c.getDataSize();
+ long addr = c.getAddressForReading(0, len);
+ buf.clear();
+ while (len > 0) {
+ int bytesThisTime = len;
+ if (bytesThisTime > BUF_MAX) {
+ bytesThisTime = BUF_MAX;
+ }
+ len -= bytesThisTime;
+ while (bytesThisTime > 0) {
+ buf.put(UnsafeMemoryChunk.readAbsoluteByte(addr));
+ addr++;
+ bytesThisTime--;
+ }
+ buf.flip();
+ while (buf.remaining() > 0) {
+ sc.write(buf);
+ }
+ buf.clear();
+ }
+ }
+ } else {
+ HeapDataOutputStream hdos = (HeapDataOutputStream)this.part;
+ hdos.sendTo(sc, buf);
+ hdos.rewind();
+ }
+ }
+ }
+
+ static private String typeCodeToString(byte c) {
+ switch (c) {
+ case BYTE_CODE:
+ return "BYTE_CODE";
+ case OBJECT_CODE:
+ return "OBJECT_CODE";
+ case EMPTY_BYTEARRAY_CODE:
+ return "EMPTY_BYTEARRAY_CODE";
+ default:
+ return "unknown code " + c;
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("partCode=");
+ sb.append(typeCodeToString(this.typeCode));
+ sb.append(" partLength=" + getLength());
+ // sb.append(" partBytes=");
+ // byte[] b = getSerializedForm();
+ // if (b == null) {
+ // sb.append("null");
+ // }
+ // else {
+ // sb.append("(");
+ // for (int i = 0; i < b.length; i ++) {
+ // sb.append(Integer.toString(b[i]));
+ // sb.append(" ");
+ // }
+ // sb.append(")");
+ // }
+ return sb.toString();
+ }
+
+ public void setVersion(Version clientVersion) {
+ this.version = clientVersion;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c741a68f/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
index 0000000,82e8114..1975601
mode 000000,100644..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/TXSynchronizationCommand.java
@@@ -1,0 -1,209 +1,210 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package com.gemstone.gemfire.internal.cache.tier.sockets.command;
+
+ import com.gemstone.gemfire.cache.SynchronizationCommitConflictException;
+ import com.gemstone.gemfire.cache.client.internal.TXSynchronizationOp.CompletionType;
+ import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+ import com.gemstone.gemfire.distributed.internal.ReplyException;
+ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+ import com.gemstone.gemfire.internal.cache.TXCommitMessage;
+ import com.gemstone.gemfire.internal.cache.TXManagerImpl;
+ import com.gemstone.gemfire.internal.cache.TXStateProxy;
+ import com.gemstone.gemfire.internal.cache.TXSynchronizationRunnable;
+ import com.gemstone.gemfire.internal.cache.tier.Command;
+ import com.gemstone.gemfire.internal.cache.tier.MessageType;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
++import com.gemstone.gemfire.internal.cache.tier.sockets.MessageTooLargeException;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+ import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+
+ import java.io.IOException;
+ import java.util.concurrent.Executor;
+
+ import javax.transaction.Status;
+
+ public class TXSynchronizationCommand extends BaseCommand {
+
+ private final static TXSynchronizationCommand singleton = new TXSynchronizationCommand();
+
+ public static Command getCommand() {
+ return singleton;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand#shouldMasqueradeForTx(com.gemstone.gemfire.internal.cache.tier.sockets.Message, com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection)
+ */
+ @Override
+ protected boolean shouldMasqueradeForTx(Message msg, ServerConnection servConn) {
+ // masquerading is done in the waiting thread pool
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.internal.cache.tier.sockets.BaseCommand#cmdExecute(com.gemstone.gemfire.internal.cache.tier.sockets.Message, com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection, long)
+ */
+ @Override
+ public void cmdExecute(final Message msg, final ServerConnection servConn, long start)
+ throws IOException, ClassNotFoundException, InterruptedException {
+
+ servConn.setAsTrue(REQUIRES_RESPONSE);
+
+ CompletionType type = CompletionType.values()[msg.getPart(0).getInt()];
+ /*int txIdInt =*/ msg.getPart(1).getInt(); // [bruce] not sure if we need to transmit this
+ final Part statusPart;
+ if (type == CompletionType.AFTER_COMPLETION) {
+ statusPart = msg.getPart(2);
+ } else {
+ statusPart = null;
+ }
+
+ final TXManagerImpl txMgr = (TXManagerImpl)servConn.getCache().getCacheTransactionManager();
+ final InternalDistributedMember member = (InternalDistributedMember)servConn.getProxyID().getDistributedMember();
+
+ // get the tx state without associating it with this thread. That's done later
+ final TXStateProxy txProxy = txMgr.masqueradeAs(msg, member, true);
+
+ // we have to run beforeCompletion and afterCompletion in the same thread
+ // because beforeCompletion obtains locks for the thread and afterCompletion
+ // releases them
+ if (txProxy != null) {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ try {
+ if (type == CompletionType.BEFORE_COMPLETION) {
+ Runnable beforeCompletion = new Runnable() {
+ @SuppressWarnings("synthetic-access")
+ public void run() {
+ TXStateProxy txState = null;
+ Throwable failureException = null;
+ try {
+ txState = txMgr.masqueradeAs(msg, member, false);
+ if (isDebugEnabled) {
+ logger.debug("Executing beforeCompletion() notification for transaction {}", msg.getTransactionId());
+ }
+ txState.setIsJTA(true);
+ txState.beforeCompletion();
+ try {
+ writeReply(msg, servConn);
+ } catch (IOException e) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", e);
+ }
+ }
+ servConn.setAsTrue(RESPONDED);
+ } catch (ReplyException e) {
+ failureException = e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ failureException = e;
+ } finally {
+ txMgr.unmasquerade(txState);
+ }
+ if (failureException != null) {
+ try {
+ writeException(msg, failureException, false, servConn);
+ } catch (IOException ioe) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", ioe);
+ }
+ }
+ servConn.setAsTrue(RESPONDED);
+ }
+ }
+ };
+ TXSynchronizationRunnable sync = new TXSynchronizationRunnable(beforeCompletion);
+ txProxy.setSynchronizationRunnable(sync);
+ Executor exec = InternalDistributedSystem.getConnectedInstance().getDistributionManager().getWaitingThreadPool();
+ exec.execute(sync);
+ sync.waitForFirstExecution();
+ } else {
+ Runnable afterCompletion = new Runnable() {
+ @SuppressWarnings("synthetic-access")
+ public void run() {
+ TXStateProxy txState = null;
+ try {
+ txState = txMgr.masqueradeAs(msg, member, false);
+ int status = statusPart.getInt();
+ if (isDebugEnabled) {
+ logger.debug("Executing afterCompletion({}) notification for transaction {}", status, msg.getTransactionId());
+ }
+ txState.setIsJTA(true);
+ txState.afterCompletion(status);
+ // GemFire commits during afterCompletion - send the commit info back to the client
+ // where it can be applied to the local cache
+ TXCommitMessage cmsg = txState.getCommitMessage();
+ try {
+ CommitCommand.writeCommitResponse(cmsg, msg, servConn);
+ txMgr.removeHostedTXState(txState.getTxId());
+ } catch (IOException e) {
+ // not much can be done here
- if (isDebugEnabled) {
- logger.debug("Problem writing reply to client", e);
++ if (isDebugEnabled || (e instanceof MessageTooLargeException)) {
++ logger.warn("Problem writing reply to client", e);
+ }
+ }
+ servConn.setAsTrue(RESPONDED);
+ } catch (RuntimeException e) {
+ try {
+ writeException(msg, e, false, servConn);
+ } catch (IOException ioe) {
+ if (isDebugEnabled) {
+ logger.debug("Problem writing reply to client", ioe);
+ }
+ }
+ servConn.setAsTrue(RESPONDED);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ txMgr.unmasquerade(txState);
+ }
+ }
+ };
+ // if there was a beforeCompletion call then there will be a thread
+ // sitting in the waiting pool to execute afterCompletion. Otherwise
+ // we have failed-over and may need to do beforeCompletion & hope that it works
+ TXSynchronizationRunnable sync = txProxy.getSynchronizationRunnable();
+ if (sync != null) {
+ sync.runSecondRunnable(afterCompletion);
+ } else {
+ if (statusPart.getInt() == Status.STATUS_COMMITTED) {
+ TXStateProxy txState = txMgr.masqueradeAs(msg, member, false);
+ try {
+ if (isDebugEnabled) {
+ logger.debug("Executing beforeCompletion() notification for transaction {} after failover", msg.getTransactionId());
+ }
+ txState.setIsJTA(true);
+ txState.beforeCompletion();
+ } finally {
+ txMgr.unmasquerade(txState);
+ }
+ }
+ afterCompletion.run();
+ }
+ }
+ } catch (Exception e) {
+ writeException(msg, MessageType.EXCEPTION, e, false, servConn);
+ servConn.setAsTrue(RESPONDED);
+ }
+ if (isDebugEnabled) {
+ logger.debug("Sent tx synchronization response");
+ }
+ }
+ }
+
+ }