You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 23:15:26 UTC
[27/35] geode git commit: GEODE-2632: refactoring preparations for
SecurityService and BaseCommand changes
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index f102b2d..1f9ef91 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -34,7 +34,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
-import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Map;
@@ -47,7 +46,7 @@ import java.util.concurrent.TimeUnit;
* and serialize it out to the wire.
*
* <PRE>
- * msgType - int - 4 bytes type of message, types enumerated below
+ * messageType - int - 4 bytes type of message, types enumerated below
*
* msgLength - int - 4 bytes total length of variable length payload
*
@@ -55,10 +54,10 @@ import java.util.concurrent.TimeUnit;
* contained in the payload. Message can
* be a multi-part message
*
- * transId - int - 4 bytes filled in by the requestor, copied back into
+ * transId - int - 4 bytes filled in by the requester, copied back into
* the response
*
- * flags - byte- 1 byte filled in by the requestor
+ * flags - byte- 1 byte filled in by the requester
* len1
* part1
* .
@@ -76,18 +75,17 @@ import java.util.concurrent.TimeUnit;
*
* See also <a href="package-summary.html#messages">package description</a>.
*
- * @see org.apache.geode.internal.cache.tier.MessageType
- *
+ * @see MessageType
*/
public class Message {
- public static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824;
- /**
- * maximum size of an outgoing message. See GEODE-478
- */
- public static int MAX_MESSAGE_SIZE =
- Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size",
- DEFAULT_MAX_MESSAGE_SIZE).intValue();
+ // Tentative workaround to avoid OOM stated in #46754.
+ public static final ThreadLocal<Integer> MESSAGE_TYPE = new ThreadLocal<>();
+
+ public static final String MAX_MESSAGE_SIZE_PROPERTY =
+ DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size";
+
+ static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824;
private static final Logger logger = LogService.getLogger();
@@ -97,83 +95,89 @@ public class Message {
private static final ThreadLocal<ByteBuffer> tlCommBuffer = new ThreadLocal<>();
- private static final byte[] TRUE;
- private static final byte[] FALSE;
+ // These two statics are fields shoved into the flags byte for transmission.
+ // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other
+ // is left in place
+ private static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02;
+ private static final byte MESSAGE_IS_RETRY = (byte) 0x04;
+
+ private static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB;
- static {
- try {
- HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
+ private static final int DEFAULT_CHUNK_SIZE = 1024;
+
+ private static final byte[] TRUE = defineTrue();
+ private static final byte[] FALSE = defineFalse();
+
+ private static byte[] defineTrue() {
+ try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) {
BlobHelper.serializeTo(Boolean.TRUE, hdos);
- TRUE = hdos.toByteArray();
- } catch (Exception e) {
+ return hdos.toByteArray();
+ } catch (IOException e) {
throw new IllegalStateException(e);
}
+ }
- try {
- HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
+ private static byte[] defineFalse() {
+ try (HeapDataOutputStream hdos = new HeapDataOutputStream(10, null)) {
BlobHelper.serializeTo(Boolean.FALSE, hdos);
- FALSE = hdos.toByteArray();
- } catch (Exception e) {
+ return hdos.toByteArray();
+ } catch (IOException e) {
throw new IllegalStateException(e);
}
}
- protected int msgType;
- protected int payloadLength = 0;
- protected int numberOfParts = 0;
+ /**
+ * maximum size of an outgoing message. See GEODE-478
+ */
+ private final int maxMessageSize;
+
+ protected int messageType;
+ private int payloadLength = 0;
+ int numberOfParts = 0;
protected int transactionId = TXManagerImpl.NOTX;
- protected int currentPart = 0;
- protected Part[] partsList = null;
- protected ByteBuffer cachedCommBuffer;
+ int currentPart = 0;
+ private Part[] partsList = null;
+ private ByteBuffer cachedCommBuffer;
protected Socket socket = null;
- protected SocketChannel sockCh = null;
- protected OutputStream os = null;
- protected InputStream is = null;
- protected boolean messageModified = true;
+ private SocketChannel socketChannel = null;
+ private OutputStream outputStream = null;
+ protected InputStream inputStream = null;
+ private boolean messageModified = true;
+
/** is this message a retry of a previously sent message? */
- protected boolean isRetry;
+ private boolean isRetry;
+
private byte flags = 0x00;
- protected MessageStats msgStats = null;
- protected ServerConnection sc = null;
+ MessageStats messageStats = null;
+ protected ServerConnection serverConnection = null;
private int maxIncomingMessageLength = -1;
private Semaphore dataLimiter = null;
- // private int MAX_MSGS = -1;
- private Semaphore msgLimiter = null;
- private boolean hdrRead = false;
- private int chunkSize = 1024;// Default Chunk Size.
+ private Semaphore messageLimiter = null;
+ private boolean readHeader = false;
+ private int chunkSize = DEFAULT_CHUNK_SIZE;
- protected Part securePart = null;
+ Part securePart = null;
private boolean isMetaRegion = false;
-
- // These two statics are fields shoved into the flags byte for transmission.
- // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the other
- // is left in place
- public static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02;
- public static final byte MESSAGE_IS_RETRY = (byte) 0x04;
-
- public static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB;
-
- // Tentative workaround to avoid OOM stated in #46754.
- public static final ThreadLocal<Integer> messageType = new ThreadLocal<Integer>();
-
- Version version;
+ private Version version;
/**
* Creates a new message with the given number of parts
*/
public Message(int numberOfParts, Version destVersion) {
+ this.maxMessageSize = Integer.getInteger(MAX_MESSAGE_SIZE_PROPERTY, DEFAULT_MAX_MESSAGE_SIZE);
this.version = destVersion;
Assert.assertTrue(destVersion != null, "Attempt to create an unversioned message");
- partsList = new Part[numberOfParts];
+ this.partsList = new Part[numberOfParts];
this.numberOfParts = numberOfParts;
- for (int i = 0; i < partsList.length; i++) {
- partsList[i] = new Part();
+ int partsListLength = this.partsList.length;
+ for (int i = 0; i < partsListLength; i++) {
+ this.partsList[i] = new Part();
}
}
public boolean isSecureMode() {
- return securePart != null;
+ return this.securePart != null;
}
public byte[] getSecureBytes() throws IOException, ClassNotFoundException {
@@ -186,7 +190,7 @@ public class Message {
throw new IllegalArgumentException(
LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString());
}
- this.msgType = msgType;
+ this.messageType = msgType;
}
public void setVersion(Version clientVersion) {
@@ -194,17 +198,15 @@ public class Message {
}
public void setMessageHasSecurePartFlag() {
- this.flags = (byte) (this.flags | MESSAGE_HAS_SECURE_PART);
+ this.flags |= MESSAGE_HAS_SECURE_PART;
}
public void clearMessageHasSecurePartFlag() {
- this.flags = (byte) (this.flags & MESSAGE_HAS_SECURE_PART);
+ this.flags &= MESSAGE_HAS_SECURE_PART;
}
/**
* Sets and builds the {@link Part}s that are sent in the payload of the Message
- *
- * @param numberOfParts
*/
public void setNumberOfParts(int numberOfParts) {
// hitesh: need to add security header here from server
@@ -227,9 +229,7 @@ public class Message {
}
/**
- * For boundary testing we may need to inject mock parts
- *
- * @param parts
+ * For boundary testing we may need to inject mock parts. For testing only.
*/
void setParts(Part[] parts) {
this.partsList = parts;
@@ -260,7 +260,7 @@ public class Message {
/**
* When building a Message this will return the number of the next Part to be added to the message
*/
- public int getNextPartNumber() {
+ int getNextPartNumber() {
return this.currentPart;
}
@@ -268,33 +268,36 @@ public class Message {
addStringPart(str, false);
}
- private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<String, byte[]>();
+ private static final Map<String, byte[]> CACHED_STRINGS = new ConcurrentHashMap<>();
public void addStringPart(String str, boolean enableCaching) {
if (str == null) {
- addRawPart((byte[]) null, false);
- } else {
- Part part = partsList[this.currentPart];
- if (enableCaching) {
- byte[] bytes = CACHED_STRINGS.get(str);
- if (bytes == null) {
- HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+ addRawPart(null, false);
+ return;
+ }
+
+ Part part = this.partsList[this.currentPart];
+ if (enableCaching) {
+ byte[] bytes = CACHED_STRINGS.get(str);
+ if (bytes == null) {
+ try (HeapDataOutputStream hdos = new HeapDataOutputStream(str)) {
bytes = hdos.toByteArray();
CACHED_STRINGS.put(str, bytes);
}
- part.setPartState(bytes, false);
- } else {
- HeapDataOutputStream hdos = new HeapDataOutputStream(str);
- this.messageModified = true;
- part.setPartState(hdos, false);
}
- this.currentPart++;
+ part.setPartState(bytes, false);
+
+ } else {
+ // do NOT close the HeapDataOutputStream
+ this.messageModified = true;
+ part.setPartState(new HeapDataOutputStream(str), false);
}
+ this.currentPart++;
}
/*
- * Adds a new part to this message that contains a <code>byte</code> array (as opposed to a
- * serialized object).
+ * Adds a new part to this message that contains a {@code byte} array (as opposed to a serialized
+ * object).
*
* @see #addPart(byte[], boolean)
*/
@@ -312,13 +315,6 @@ public class Message {
}
}
- public void addDeltaPart(HeapDataOutputStream hdos) {
- this.messageModified = true;
- Part part = partsList[this.currentPart];
- part.setPartState(hdos, false);
- this.currentPart++;
- }
-
public void addObjPart(Object o) {
addObjPart(o, false);
}
@@ -345,6 +341,9 @@ public class Message {
}
}
+ /**
+ * Object o is always null
+ */
public void addPartInAnyForm(@Unretained Object o, boolean isObject) {
if (o == null) {
addRawPart((byte[]) o, false);
@@ -353,7 +352,7 @@ public class Message {
} else if (o instanceof StoredObject) {
// It is possible it is an off-heap StoredObject that contains a simple non-object byte[].
this.messageModified = true;
- Part part = partsList[this.currentPart];
+ Part part = this.partsList[this.currentPart];
part.setPartState((StoredObject) o, isObject);
this.currentPart++;
} else {
@@ -362,59 +361,58 @@ public class Message {
}
private void serializeAndAddPartNoCopying(Object o) {
- HeapDataOutputStream hdos;
- Version v = version;
- if (version.equals(Version.CURRENT)) {
+ Version v = this.version;
+ if (this.version.equals(Version.CURRENT)) {
v = null;
}
- // create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources
- // passed to it.
- hdos = new HeapDataOutputStream(chunkSize, v, true);
+
+ // Create the HDOS with a flag telling it that it can keep any byte[] or ByteBuffers/ByteSources
+ // passed to it. Do NOT close the HeapDataOutputStream!
+ HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v, true);
try {
BlobHelper.serializeTo(o, hdos);
} catch (IOException ex) {
throw new SerializationException("failed serializing object", ex);
}
this.messageModified = true;
- Part part = partsList[this.currentPart];
+ Part part = this.partsList[this.currentPart];
part.setPartState(hdos, true);
this.currentPart++;
-
}
private void serializeAndAddPart(Object o, boolean zipValues) {
if (zipValues) {
throw new UnsupportedOperationException("zipValues no longer supported");
+ }
- } else {
- HeapDataOutputStream hdos;
- Version v = version;
- if (version.equals(Version.CURRENT)) {
- v = null;
- }
- hdos = new HeapDataOutputStream(chunkSize, v);
- try {
- BlobHelper.serializeTo(o, hdos);
- } catch (IOException ex) {
- throw new SerializationException("failed serializing object", ex);
- }
- this.messageModified = true;
- Part part = partsList[this.currentPart];
- part.setPartState(hdos, true);
- this.currentPart++;
+ Version v = this.version;
+ if (this.version.equals(Version.CURRENT)) {
+ v = null;
}
+
+ // do NOT close the HeapDataOutputStream
+ HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v);
+ try {
+ BlobHelper.serializeTo(o, hdos);
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ this.messageModified = true;
+ Part part = this.partsList[this.currentPart];
+ part.setPartState(hdos, true);
+ this.currentPart++;
}
public void addIntPart(int v) {
this.messageModified = true;
- Part part = partsList[this.currentPart];
+ Part part = this.partsList[this.currentPart];
part.setInt(v);
this.currentPart++;
}
public void addLongPart(long v) {
this.messageModified = true;
- Part part = partsList[this.currentPart];
+ Part part = this.partsList[this.currentPart];
part.setLong(v);
this.currentPart++;
}
@@ -424,13 +422,13 @@ public class Message {
*/
public void addRawPart(byte[] newPart, boolean isObject) {
this.messageModified = true;
- Part part = partsList[this.currentPart];
+ Part part = this.partsList[this.currentPart];
part.setPartState(newPart, isObject);
this.currentPart++;
}
public int getMessageType() {
- return this.msgType;
+ return this.messageType;
}
public int getPayloadLength() {
@@ -451,7 +449,7 @@ public class Message {
public Part getPart(int index) {
if (index < this.numberOfParts) {
- Part p = partsList[index];
+ Part p = this.partsList[index];
if (this.version != null) {
p.setVersion(this.version);
}
@@ -480,9 +478,9 @@ public class Message {
if (len != 0) {
this.payloadLength = 0;
}
- if (this.hdrRead) {
- if (this.msgStats != null) {
- this.msgStats.decMessagesBeingReceived(len);
+ if (this.readHeader) {
+ if (this.messageStats != null) {
+ this.messageStats.decMessagesBeingReceived(len);
}
}
ByteBuffer buffer = getCommBuffer();
@@ -495,20 +493,19 @@ public class Message {
this.dataLimiter = null;
this.maxIncomingMessageLength = 0;
}
- if (this.hdrRead) {
- if (this.msgLimiter != null) {
- this.msgLimiter.release(1);
- this.msgLimiter = null;
+ if (this.readHeader) {
+ if (this.messageLimiter != null) {
+ this.messageLimiter.release(1);
+ this.messageLimiter = null;
}
- this.hdrRead = false;
+ this.readHeader = false;
}
this.flags = 0;
}
protected void packHeaderInfoForSending(int msgLen, boolean isSecurityHeader) {
- // hitesh: setting second bit of flags byte for client
- // this is not require but this makes all changes easily at client side right now
- // just see this bit and process security header
+ // setting second bit of flags byte for client this is not require but this makes all changes
+ // easily at client side right now just see this bit and process security header
byte flagsByte = this.flags;
if (isSecurityHeader) {
flagsByte |= MESSAGE_HAS_SECURE_PART;
@@ -516,14 +513,14 @@ public class Message {
if (this.isRetry) {
flagsByte |= MESSAGE_IS_RETRY;
}
- getCommBuffer().putInt(this.msgType).putInt(msgLen).putInt(this.numberOfParts)
+ getCommBuffer().putInt(this.messageType).putInt(msgLen).putInt(this.numberOfParts)
.putInt(this.transactionId).put(flagsByte);
}
protected Part getSecurityPart() {
- if (this.sc != null) {
+ if (this.serverConnection != null) {
// look types right put get etc
- return this.sc.updateAndGetSecurityPart();
+ return this.serverConnection.updateAndGetSecurityPart();
}
return null;
}
@@ -537,7 +534,7 @@ public class Message {
this.isMetaRegion = isMetaRegion;
}
- public boolean getAndResetIsMetaRegion() {
+ boolean getAndResetIsMetaRegion() {
boolean isMetaRegion = this.isMetaRegion;
this.isMetaRegion = false;
return isMetaRegion;
@@ -546,21 +543,20 @@ public class Message {
/**
* Sends this message out on its socket.
*/
- protected void sendBytes(boolean clearMessage) throws IOException {
- if (this.sc != null) {
+ void sendBytes(boolean clearMessage) throws IOException {
+ if (this.serverConnection != null) {
// Keep track of the fact that we are making progress.
- this.sc.updateProcessingMessage();
+ this.serverConnection.updateProcessingMessage();
}
if (this.socket == null) {
throw new IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
}
try {
- final ByteBuffer cb = getCommBuffer();
- if (cb == null) {
+ final ByteBuffer commBuffer = getCommBuffer();
+ if (commBuffer == null) {
throw new IOException("No buffer");
}
- int msgLen = 0;
- synchronized (cb) {
+ synchronized (commBuffer) {
long totalPartLen = 0;
long headerLen = 0;
int partsToTransmit = this.numberOfParts;
@@ -581,50 +577,50 @@ public class Message {
partsToTransmit++;
}
- if ((headerLen + totalPartLen) > Integer.MAX_VALUE) {
+ if (headerLen + totalPartLen > Integer.MAX_VALUE) {
throw new MessageTooLargeException(
"Message size (" + (headerLen + totalPartLen) + ") exceeds maximum integer value");
}
- msgLen = (int) (headerLen + totalPartLen);
+ int msgLen = (int) (headerLen + totalPartLen);
- if (msgLen > MAX_MESSAGE_SIZE) {
+ if (msgLen > this.maxMessageSize) {
throw new MessageTooLargeException("Message size (" + msgLen
- + ") exceeds gemfire.client.max-message-size setting (" + MAX_MESSAGE_SIZE + ")");
+ + ") exceeds gemfire.client.max-message-size setting (" + this.maxMessageSize + ")");
}
- cb.clear();
- packHeaderInfoForSending(msgLen, (securityPart != null));
+ commBuffer.clear();
+ packHeaderInfoForSending(msgLen, securityPart != null);
for (int i = 0; i < partsToTransmit; i++) {
- Part part = (i == this.numberOfParts) ? securityPart : partsList[i];
+ Part part = i == this.numberOfParts ? securityPart : this.partsList[i];
- if (cb.remaining() < PART_HEADER_SIZE) {
+ if (commBuffer.remaining() < PART_HEADER_SIZE) {
flushBuffer();
}
int partLen = part.getLength();
- cb.putInt(partLen);
- cb.put(part.getTypeCode());
- if (partLen <= cb.remaining()) {
- part.writeTo(cb);
+ commBuffer.putInt(partLen);
+ commBuffer.put(part.getTypeCode());
+ if (partLen <= commBuffer.remaining()) {
+ part.writeTo(commBuffer);
} else {
flushBuffer();
- if (this.sockCh != null) {
- part.writeTo(this.sockCh, cb);
+ if (this.socketChannel != null) {
+ part.writeTo(this.socketChannel, commBuffer);
} else {
- part.writeTo(this.os, cb);
+ part.writeTo(this.outputStream, commBuffer);
}
- if (this.msgStats != null) {
- this.msgStats.incSentBytes(partLen);
+ if (this.messageStats != null) {
+ this.messageStats.incSentBytes(partLen);
}
}
}
- if (cb.position() != 0) {
+ if (commBuffer.position() != 0) {
flushBuffer();
}
this.messageModified = false;
- if (this.sockCh == null) {
- this.os.flush();
+ if (this.socketChannel == null) {
+ this.outputStream.flush();
}
}
} finally {
@@ -634,69 +630,67 @@ public class Message {
}
}
- protected void flushBuffer() throws IOException {
+ void flushBuffer() throws IOException {
final ByteBuffer cb = getCommBuffer();
- if (this.sockCh != null) {
+ if (this.socketChannel != null) {
cb.flip();
do {
- this.sockCh.write(cb);
+ this.socketChannel.write(cb);
} while (cb.remaining() > 0);
} else {
- this.os.write(cb.array(), 0, cb.position());
+ this.outputStream.write(cb.array(), 0, cb.position());
}
- if (this.msgStats != null) {
- this.msgStats.incSentBytes(cb.position());
+ if (this.messageStats != null) {
+ this.messageStats.incSentBytes(cb.position());
}
cb.clear();
}
private void read() throws IOException {
clearParts();
- // TODO:Hitesh ??? for server changes make sure sc is not null as this class also used by client
- // :(
+ // TODO: for server changes make sure sc is not null as this class also used by client
readHeaderAndPayload();
}
/**
* Read the actual bytes of the header off the socket
*/
- protected void fetchHeader() throws IOException {
+ void fetchHeader() throws IOException {
final ByteBuffer cb = getCommBuffer();
cb.clear();
- // msgType is invalidated here and can be used as an indicator
- // of problems reading the message
- this.msgType = MessageType.INVALID;
- int hdr = 0;
+ // messageType is invalidated here and can be used as an indicator
+ // of problems reading the message
+ this.messageType = MessageType.INVALID;
final int headerLength = getHeaderLength();
- if (this.sockCh != null) {
+ if (this.socketChannel != null) {
cb.limit(headerLength);
do {
- int bytesRead = this.sockCh.read(cb);
- // System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes commBuffer=" + cb);
+ int bytesRead = this.socketChannel.read(cb);
if (bytesRead == -1) {
throw new EOFException(
LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER
.toLocalizedString());
}
- if (this.msgStats != null) {
- this.msgStats.incReceivedBytes(bytesRead);
+ if (this.messageStats != null) {
+ this.messageStats.incReceivedBytes(bytesRead);
}
} while (cb.remaining() > 0);
cb.flip();
+
} else {
+ int hdr = 0;
do {
- int bytesRead = -1;
- bytesRead = this.is.read(cb.array(), hdr, headerLength - hdr);
+ int bytesRead = this.inputStream.read(cb.array(), hdr, headerLength - hdr);
if (bytesRead == -1) {
throw new EOFException(
LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER
.toLocalizedString());
}
hdr += bytesRead;
- if (this.msgStats != null) {
- this.msgStats.incReceivedBytes(bytesRead);
+ if (this.messageStats != null) {
+ this.messageStats.incReceivedBytes(bytesRead);
}
} while (hdr < headerLength);
@@ -717,34 +711,36 @@ public class Message {
if (!MessageType.validate(type)) {
throw new IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER
- .toLocalizedString(Integer.valueOf(type)));
+ .toLocalizedString(type));
}
+
int timeToWait = 0;
- if (this.sc != null) {
+ if (this.serverConnection != null) {
// Keep track of the fact that a message is being processed.
- this.sc.setProcessingMessage();
- timeToWait = sc.getClientReadTimeout();
+ this.serverConnection.setProcessingMessage();
+ timeToWait = this.serverConnection.getClientReadTimeout();
}
- this.hdrRead = true;
- if (this.msgLimiter != null) {
+ this.readHeader = true;
+
+ if (this.messageLimiter != null) {
for (;;) {
- this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+ this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
if (timeToWait == 0) {
- this.msgLimiter.acquire(1);
+ this.messageLimiter.acquire(1);
} else {
- if (!this.msgLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) {
- if (this.msgStats != null && this.msgStats instanceof CacheServerStats) {
- ((CacheServerStats) this.msgStats).incConnectionsTimedOut();
+ if (!this.messageLimiter.tryAcquire(1, timeToWait, TimeUnit.MILLISECONDS)) {
+ if (this.messageStats instanceof CacheServerStats) {
+ ((CacheServerStats) this.messageStats).incConnectionsTimedOut();
}
throw new IOException(
LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS
- .toLocalizedString(Integer.valueOf(timeToWait)));
+ .toLocalizedString(timeToWait));
}
}
break;
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -753,16 +749,17 @@ public class Message {
}
} // for
}
+
if (len > 0) {
if (this.maxIncomingMessageLength > 0 && len > this.maxIncomingMessageLength) {
throw new IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1
- .toLocalizedString(new Object[] {Integer.valueOf(len),
- Integer.valueOf(this.maxIncomingMessageLength)}));
+ .toLocalizedString(new Object[] {len, this.maxIncomingMessageLength}));
}
+
if (this.dataLimiter != null) {
for (;;) {
- if (sc != null) {
- this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+ if (this.serverConnection != null) {
+ this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null);
}
boolean interrupted = Thread.interrupted();
try {
@@ -770,21 +767,21 @@ public class Message {
this.dataLimiter.acquire(len);
} else {
int newTimeToWait = timeToWait;
- if (this.msgLimiter != null) {
+ if (this.messageLimiter != null) {
// may have waited for msg limit so recalc time to wait
- newTimeToWait -= (int) sc.getCurrentMessageProcessingTime();
+ newTimeToWait -= (int) this.serverConnection.getCurrentMessageProcessingTime();
}
if (newTimeToWait <= 0
- || !this.msgLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) {
+ || !this.messageLimiter.tryAcquire(1, newTimeToWait, TimeUnit.MILLISECONDS)) {
throw new IOException(
LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS
.toLocalizedString(timeToWait));
}
}
- this.payloadLength = len; // makes sure payloadLength gets set now so we will release
- // the semaphore
+ // makes sure payloadLength gets set now so we will release the semaphore
+ this.payloadLength = len;
break; // success
- } catch (InterruptedException e) {
+ } catch (InterruptedException ignore) {
interrupted = true;
} finally {
if (interrupted) {
@@ -794,15 +791,15 @@ public class Message {
}
}
}
- if (this.msgStats != null) {
- this.msgStats.incMessagesBeingReceived(len);
+ if (this.messageStats != null) {
+ this.messageStats.incMessagesBeingReceived(len);
this.payloadLength = len; // makes sure payloadLength gets set now so we will dec on clear
}
this.isRetry = (bits & MESSAGE_IS_RETRY) != 0;
- bits = (byte) (bits & MESSAGE_IS_RETRY_MASK);
+ bits &= MESSAGE_IS_RETRY_MASK;
this.flags = bits;
- this.msgType = type;
+ this.messageType = type;
readPayloadFields(numParts, len);
@@ -813,32 +810,38 @@ public class Message {
// this.numberOfParts = numParts; Already set in setPayloadFields via setNumberOfParts
this.transactionId = txid;
this.flags = bits;
- if (this.sc != null) {
+ if (this.serverConnection != null) {
// Keep track of the fact that a message is being processed.
- this.sc.updateProcessingMessage();
+ this.serverConnection.updateProcessingMessage();
}
}
- protected void readPayloadFields(final int numParts, final int len) throws IOException {
+ /**
+ * TODO: refactor overly long method readPayloadFields
+ */
+ void readPayloadFields(final int numParts, final int len) throws IOException {
if (len > 0 && numParts <= 0 || len <= 0 && numParts > 0) {
throw new IOException(
LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT
- .toLocalizedString(new Object[] {Integer.valueOf(len), Integer.valueOf(numParts)}));
+ .toLocalizedString(new Object[] {len, numParts}));
}
- Integer msgType = messageType.get();
+ Integer msgType = MESSAGE_TYPE.get();
if (msgType != null && msgType == MessageType.PING) {
- messageType.set(null); // set it to null right away.
- int pingParts = 10; // Some number which will not throw OOM but still be acceptable for a ping
- // operation.
+ // set it to null right away.
+ MESSAGE_TYPE.set(null);
+ // Some number which will not throw OOM but still be acceptable for a ping operation.
+ int pingParts = 10;
if (numParts > pingParts) {
throw new IOException("Part length ( " + numParts + " ) is inconsistent for "
+ MessageType.getString(msgType) + " operation.");
}
}
+
setNumberOfParts(numParts);
- if (numParts <= 0)
+ if (numParts <= 0) {
return;
+ }
if (len < 0) {
logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, len));
@@ -849,12 +852,11 @@ public class Message {
cb.clear();
cb.flip();
- int readSecurePart = 0;
- readSecurePart = checkAndSetSecurityPart();
+ int readSecurePart = checkAndSetSecurityPart();
int bytesRemaining = len;
- for (int i = 0; ((i < numParts + readSecurePart)
- || ((readSecurePart == 1) && (cb.remaining() > 0))); i++) {
+ for (int i = 0; i < numParts + readSecurePart
+ || readSecurePart == 1 && cb.remaining() > 0; i++) {
int bytesReadThisTime = readPartChunk(bytesRemaining);
bytesRemaining -= bytesReadThisTime;
@@ -869,6 +871,7 @@ public class Message {
int partLen = cb.getInt();
byte partType = cb.get();
byte[] partBytes = null;
+
if (partLen > 0) {
partBytes = new byte[partLen];
int alreadyReadBytes = cb.remaining();
@@ -878,26 +881,27 @@ public class Message {
}
cb.get(partBytes, 0, alreadyReadBytes);
}
+
// now we need to read partLen - alreadyReadBytes off the wire
int off = alreadyReadBytes;
int remaining = partLen - off;
while (remaining > 0) {
- if (this.sockCh != null) {
+ if (this.socketChannel != null) {
int bytesThisTime = remaining;
cb.clear();
if (bytesThisTime > cb.capacity()) {
bytesThisTime = cb.capacity();
}
cb.limit(bytesThisTime);
- int res = this.sockCh.read(cb);
+ int res = this.socketChannel.read(cb);
if (res != -1) {
cb.flip();
bytesRemaining -= res;
remaining -= res;
cb.get(partBytes, off, res);
off += res;
- if (this.msgStats != null) {
- this.msgStats.incReceivedBytes(res);
+ if (this.messageStats != null) {
+ this.messageStats.incReceivedBytes(res);
}
} else {
throw new EOFException(
@@ -905,14 +909,13 @@ public class Message {
.toLocalizedString());
}
} else {
- int res = 0;
- res = this.is.read(partBytes, off, remaining);
+ int res = this.inputStream.read(partBytes, off, remaining);
if (res != -1) {
bytesRemaining -= res;
remaining -= res;
off += res;
- if (this.msgStats != null) {
- this.msgStats.incReceivedBytes(res);
+ if (this.messageStats != null) {
+ this.messageStats.incReceivedBytes(res);
}
} else {
throw new EOFException(
@@ -941,35 +944,38 @@ public class Message {
* @return the number of bytes read into commBuffer
*/
private int readPartChunk(int bytesRemaining) throws IOException {
- final ByteBuffer cb = getCommBuffer();
- if (cb.remaining() >= PART_HEADER_SIZE) {
+ final ByteBuffer commBuffer = getCommBuffer();
+ if (commBuffer.remaining() >= PART_HEADER_SIZE) {
// we already have the next part header in commBuffer so just return
return 0;
}
- if (cb.position() != 0) {
- cb.compact();
+
+ if (commBuffer.position() != 0) {
+ commBuffer.compact();
} else {
- cb.position(cb.limit());
- cb.limit(cb.capacity());
+ commBuffer.position(commBuffer.limit());
+ commBuffer.limit(commBuffer.capacity());
}
- int bytesRead = 0;
- if (this.sc != null) {
+
+ if (this.serverConnection != null) {
// Keep track of the fact that we are making progress
- this.sc.updateProcessingMessage();
+ this.serverConnection.updateProcessingMessage();
}
- if (this.sockCh != null) {
- int remaining = cb.remaining();
+ int bytesRead = 0;
+
+ if (this.socketChannel != null) {
+ int remaining = commBuffer.remaining();
if (remaining > bytesRemaining) {
remaining = bytesRemaining;
- cb.limit(cb.position() + bytesRemaining);
+ commBuffer.limit(commBuffer.position() + bytesRemaining);
}
while (remaining > 0) {
- int res = this.sockCh.read(cb);
+ int res = this.socketChannel.read(commBuffer);
if (res != -1) {
remaining -= res;
bytesRead += res;
- if (this.msgStats != null) {
- this.msgStats.incReceivedBytes(res);
+ if (this.messageStats != null) {
+ this.messageStats.incReceivedBytes(res);
}
} else {
throw new EOFException(
@@ -979,21 +985,20 @@ public class Message {
}
} else {
- int bufSpace = cb.capacity() - cb.position();
- int bytesToRead = bufSpace;
+ int bytesToRead = commBuffer.capacity() - commBuffer.position();
if (bytesRemaining < bytesToRead) {
bytesToRead = bytesRemaining;
}
- int pos = cb.position();
+ int pos = commBuffer.position();
+
while (bytesToRead > 0) {
- int res = 0;
- res = this.is.read(cb.array(), pos, bytesToRead);
+ int res = this.inputStream.read(commBuffer.array(), pos, bytesToRead);
if (res != -1) {
bytesToRead -= res;
pos += res;
bytesRead += res;
- if (this.msgStats != null) {
- this.msgStats.incReceivedBytes(res);
+ if (this.messageStats != null) {
+ this.messageStats.incReceivedBytes(res);
}
} else {
throw new EOFException(
@@ -1001,9 +1006,10 @@ public class Message {
.toLocalizedString());
}
}
- cb.position(pos);
+
+ commBuffer.position(pos);
}
- cb.flip();
+ commBuffer.flip();
return bytesRead;
}
@@ -1011,40 +1017,39 @@ public class Message {
* Gets rid of all the parts that have been added to this message.
*/
public void clearParts() {
- for (int i = 0; i < partsList.length; i++) {
- partsList[i].clear();
+ for (Part part : this.partsList) {
+ part.clear();
}
this.currentPart = 0;
}
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("type=").append(MessageType.getString(msgType));
- sb.append("; payloadLength=").append(payloadLength);
- sb.append("; numberOfParts=").append(numberOfParts);
- sb.append("; transactionId=").append(transactionId);
- sb.append("; currentPart=").append(currentPart);
- sb.append("; messageModified=").append(messageModified);
- sb.append("; flags=").append(Integer.toHexString(flags));
- for (int i = 0; i < numberOfParts; i++) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("type=").append(MessageType.getString(this.messageType));
+ sb.append("; payloadLength=").append(this.payloadLength);
+ sb.append("; numberOfParts=").append(this.numberOfParts);
+ sb.append("; transactionId=").append(this.transactionId);
+ sb.append("; currentPart=").append(this.currentPart);
+ sb.append("; messageModified=").append(this.messageModified);
+ sb.append("; flags=").append(Integer.toHexString(this.flags));
+ for (int i = 0; i < this.numberOfParts; i++) {
sb.append("; part[").append(i).append("]={");
- sb.append(this.partsList[i].toString());
+ sb.append(this.partsList[i]);
sb.append("}");
}
return sb.toString();
}
-
- public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats)
+ void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, MessageStats msgStats)
throws IOException {
- this.sc = sc;
+ this.serverConnection = sc;
setComms(socket, bb, msgStats);
}
- public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
- this.sockCh = socket.getChannel();
- if (this.sockCh == null) {
+ void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws IOException {
+ this.socketChannel = socket.getChannel();
+ if (this.socketChannel == null) {
setComms(socket, socket.getInputStream(), socket.getOutputStream(), bb, msgStats);
} else {
setComms(socket, null, null, bb, msgStats);
@@ -1052,14 +1057,14 @@ public class Message {
}
public void setComms(Socket socket, InputStream is, OutputStream os, ByteBuffer bb,
- MessageStats msgStats) throws IOException {
+ MessageStats msgStats) {
Assert.assertTrue(socket != null);
this.socket = socket;
- this.sockCh = socket.getChannel();
- this.is = is;
- this.os = os;
+ this.socketChannel = socket.getChannel();
+ this.inputStream = is;
+ this.outputStream = os;
this.cachedCommBuffer = bb;
- this.msgStats = msgStats;
+ this.messageStats = msgStats;
}
/**
@@ -1069,11 +1074,11 @@ public class Message {
*/
public void unsetComms() {
this.socket = null;
- this.sockCh = null;
- this.is = null;
- this.os = null;
+ this.socketChannel = null;
+ this.inputStream = null;
+ this.outputStream = null;
this.cachedCommBuffer = null;
- this.msgStats = null;
+ this.messageStats = null;
}
/**
@@ -1084,7 +1089,7 @@ public class Message {
}
public void send(ServerConnection servConn) throws IOException {
- if (this.sc != servConn)
+ if (this.serverConnection != servConn)
throw new IllegalStateException("this.sc was not correctly set");
send(true);
}
@@ -1097,7 +1102,7 @@ public class Message {
}
/**
- * Populates the stats of this <code>Message</code> with information received via its socket
+ * Populates the stats of this {@code Message} with information received via its socket
*/
public void recv() throws IOException {
if (this.socket != null) {
@@ -1111,10 +1116,10 @@ public class Message {
public void recv(ServerConnection sc, int maxMessageLength, Semaphore dataLimiter,
Semaphore msgLimiter) throws IOException {
- this.sc = sc;
+ this.serverConnection = sc;
this.maxIncomingMessageLength = maxMessageLength;
this.dataLimiter = dataLimiter;
- this.msgLimiter = msgLimiter;
+ this.messageLimiter = msgLimiter;
recv();
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 83d0e9d..485ccae 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -723,12 +723,10 @@ public class ServerConnection implements Runnable {
ThreadState threadState = null;
try {
if (msg != null) {
- // this.logger.fine("donormalMsg() msgType " + msg.getMessageType());
- // Since this thread is not interrupted when the cache server is
- // shutdown,
- // test again after a message has been read. This is a bit of a hack. I
- // think this thread should be interrupted, but currently AcceptorImpl
- // doesn't keep track of the threads that it launches.
+ // Since this thread is not interrupted when the cache server is shutdown, test again after
+ // a message has been read. This is a bit of a hack. I think this thread should be
+ // interrupted, but currently AcceptorImpl doesn't keep track of the threads that it
+ // launches.
if (!this.processMessages || (crHelper.isShutdown())) {
if (logger.isDebugEnabled()) {
logger.debug("{} ignoring message of type {} from client {} due to shutdown.",
@@ -1078,8 +1076,6 @@ public class ServerConnection implements Runnable {
*/
public Part updateAndGetSecurityPart() {
// need to take care all message types here
- // this.logger.fine("getSecurityPart() msgType = "
- // + this.requestMsg.msgType);
if (AcceptorImpl.isAuthenticationRequired()
&& this.handshake.getVersion().compareTo(Version.GFE_65) >= 0
&& (this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY)
@@ -1090,40 +1086,40 @@ public class ServerConnection implements Runnable {
if (AcceptorImpl.isAuthenticationRequired() && logger.isDebugEnabled()) {
logger.debug(
"ServerConnection.updateAndGetSecurityPart() not adding security part for msg type {}",
- MessageType.getString(this.requestMsg.msgType));
+ MessageType.getString(this.requestMsg.messageType));
}
}
return null;
}
private boolean isInternalMessage() {
- return (this.requestMsg.msgType == MessageType.CLIENT_READY
- || this.requestMsg.msgType == MessageType.CLOSE_CONNECTION
- || this.requestMsg.msgType == MessageType.GETCQSTATS_MSG_TYPE
- || this.requestMsg.msgType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES
- || this.requestMsg.msgType == MessageType.GET_CLIENT_PR_METADATA
- || this.requestMsg.msgType == MessageType.INVALID
- || this.requestMsg.msgType == MessageType.MAKE_PRIMARY
- || this.requestMsg.msgType == MessageType.MONITORCQ_MSG_TYPE
- || this.requestMsg.msgType == MessageType.PERIODIC_ACK
- || this.requestMsg.msgType == MessageType.PING
- || this.requestMsg.msgType == MessageType.REGISTER_DATASERIALIZERS
- || this.requestMsg.msgType == MessageType.REGISTER_INSTANTIATORS
- || this.requestMsg.msgType == MessageType.REQUEST_EVENT_VALUE
- || this.requestMsg.msgType == MessageType.ADD_PDX_TYPE
- || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_TYPE
- || this.requestMsg.msgType == MessageType.GET_PDX_TYPE_BY_ID
- || this.requestMsg.msgType == MessageType.SIZE
- || this.requestMsg.msgType == MessageType.TX_FAILOVER
- || this.requestMsg.msgType == MessageType.TX_SYNCHRONIZATION
- || this.requestMsg.msgType == MessageType.GET_FUNCTION_ATTRIBUTES
- || this.requestMsg.msgType == MessageType.ADD_PDX_ENUM
- || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_ENUM
- || this.requestMsg.msgType == MessageType.GET_PDX_ENUM_BY_ID
- || this.requestMsg.msgType == MessageType.GET_PDX_TYPES
- || this.requestMsg.msgType == MessageType.GET_PDX_ENUMS
- || this.requestMsg.msgType == MessageType.COMMIT
- || this.requestMsg.msgType == MessageType.ROLLBACK);
+ return (this.requestMsg.messageType == MessageType.CLIENT_READY
+ || this.requestMsg.messageType == MessageType.CLOSE_CONNECTION
+ || this.requestMsg.messageType == MessageType.GETCQSTATS_MSG_TYPE
+ || this.requestMsg.messageType == MessageType.GET_CLIENT_PARTITION_ATTRIBUTES
+ || this.requestMsg.messageType == MessageType.GET_CLIENT_PR_METADATA
+ || this.requestMsg.messageType == MessageType.INVALID
+ || this.requestMsg.messageType == MessageType.MAKE_PRIMARY
+ || this.requestMsg.messageType == MessageType.MONITORCQ_MSG_TYPE
+ || this.requestMsg.messageType == MessageType.PERIODIC_ACK
+ || this.requestMsg.messageType == MessageType.PING
+ || this.requestMsg.messageType == MessageType.REGISTER_DATASERIALIZERS
+ || this.requestMsg.messageType == MessageType.REGISTER_INSTANTIATORS
+ || this.requestMsg.messageType == MessageType.REQUEST_EVENT_VALUE
+ || this.requestMsg.messageType == MessageType.ADD_PDX_TYPE
+ || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_TYPE
+ || this.requestMsg.messageType == MessageType.GET_PDX_TYPE_BY_ID
+ || this.requestMsg.messageType == MessageType.SIZE
+ || this.requestMsg.messageType == MessageType.TX_FAILOVER
+ || this.requestMsg.messageType == MessageType.TX_SYNCHRONIZATION
+ || this.requestMsg.messageType == MessageType.GET_FUNCTION_ATTRIBUTES
+ || this.requestMsg.messageType == MessageType.ADD_PDX_ENUM
+ || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_ENUM
+ || this.requestMsg.messageType == MessageType.GET_PDX_ENUM_BY_ID
+ || this.requestMsg.messageType == MessageType.GET_PDX_TYPES
+ || this.requestMsg.messageType == MessageType.GET_PDX_ENUMS
+ || this.requestMsg.messageType == MessageType.COMMIT
+ || this.requestMsg.messageType == MessageType.ROLLBACK);
}
public void run() {
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java
new file mode 100644
index 0000000..7118347
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.internal.InternalDataSerializer;
+
+/**
+ * Send interest registration to another server. Since interest registration performs a state-flush
+ * operation this message must not transmitted on an ordered socket.
+ * <p>
+ * Extracted from CacheClientNotifier
+ */
+public class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
+ implements MessageWithReply {
+
+ private ClientProxyMembershipID clientId;
+ private ClientInterestMessageImpl clientMessage;
+ private int processorId;
+
+ ServerInterestRegistrationMessage(ClientProxyMembershipID clientId,
+ ClientInterestMessageImpl clientInterestMessage) {
+ this.clientId = clientId;
+ this.clientMessage = clientInterestMessage;
+ }
+
+ public ServerInterestRegistrationMessage() {
+ // deserializing in fromData
+ }
+
+ static void sendInterestChange(DM dm, ClientProxyMembershipID clientId,
+ ClientInterestMessageImpl clientInterestMessage) {
+ ServerInterestRegistrationMessage registrationMessage =
+ new ServerInterestRegistrationMessage(clientId, clientInterestMessage);
+
+ Set recipients = dm.getOtherDistributionManagerIds();
+ registrationMessage.setRecipients(recipients);
+
+ ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, recipients);
+ registrationMessage.processorId = replyProcessor.getProcessorId();
+
+ dm.putOutgoing(registrationMessage);
+
+ try {
+ replyProcessor.waitForReplies();
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ protected void process(DistributionManager dm) {
+ // Get the proxy for the proxy id
+ try {
+ CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+ if (clientNotifier != null) {
+ CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId);
+ // If this VM contains a proxy for the requested proxy id, forward the
+ // message on to the proxy for processing
+ if (proxy != null) {
+ proxy.processInterestMessage(this.clientMessage);
+ }
+ }
+ } finally {
+ ReplyMessage reply = new ReplyMessage();
+ reply.setProcessorId(this.processorId);
+ reply.setRecipient(getSender());
+ try {
+ dm.putOutgoing(reply);
+ } catch (CancelException ignore) {
+ // can't send a reply, so ignore the exception
+ }
+ }
+ }
+
+ @Override
+ public int getDSFID() {
+ return SERVER_INTEREST_REGISTRATION_MESSAGE;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ out.writeInt(this.processorId);
+ InternalDataSerializer.invokeToData(this.clientId, out);
+ InternalDataSerializer.invokeToData(this.clientMessage, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.processorId = in.readInt();
+ this.clientId = new ClientProxyMembershipID();
+ InternalDataSerializer.invokeFromData(this.clientId, in);
+ this.clientMessage = new ClientInterestMessageImpl();
+ InternalDataSerializer.invokeFromData(this.clientMessage, in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
index 1b599e9..2cb36cd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
@@ -39,29 +39,30 @@ public class AddPdxEnum extends BaseCommand {
private AddPdxEnum() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
if (logger.isDebugEnabled()) {
logger.debug("{}: Received get pdx id for enum request ({} parts) from {}",
- servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString());
+ serverConnection.getName(), clientMessage.getNumberOfParts(),
+ serverConnection.getSocketString());
}
- int noOfParts = msg.getNumberOfParts();
+ int noOfParts = clientMessage.getNumberOfParts();
- EnumInfo enumInfo = (EnumInfo) msg.getPart(0).getObject();
- int enumId = msg.getPart(1).getInt();
+ EnumInfo enumInfo = (EnumInfo) clientMessage.getPart(0).getObject();
+ int enumId = clientMessage.getPart(1).getInt();
try {
- InternalCache cache = servConn.getCache();
+ InternalCache cache = serverConnection.getCache();
TypeRegistry registry = cache.getPdxRegistry();
registry.addRemoteEnum(enumId, enumInfo);
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
index 9b8302e..3feba0d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
@@ -39,33 +39,34 @@ public class AddPdxType extends BaseCommand {
private AddPdxType() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
if (logger.isDebugEnabled()) {
logger.debug("{}: Received get pdx id for type request ({} parts) from {}",
- servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString());
+ serverConnection.getName(), clientMessage.getNumberOfParts(),
+ serverConnection.getSocketString());
}
- int noOfParts = msg.getNumberOfParts();
+ int noOfParts = clientMessage.getNumberOfParts();
- PdxType type = (PdxType) msg.getPart(0).getObject();
- int typeId = msg.getPart(1).getInt();
+ PdxType type = (PdxType) clientMessage.getPart(0).getObject();
+ int typeId = clientMessage.getPart(1).getInt();
// The native client needs this line
// because it doesn't set the type id on the
// client side.
type.setTypeId(typeId);
try {
- InternalCache cache = servConn.getCache();
+ InternalCache cache = serverConnection.getCache();
TypeRegistry registry = cache.getPdxRegistry();
registry.addRemoteType(typeId, type);
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java
index 959430c..ab19954 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java
@@ -47,15 +47,15 @@ public class ClearRegion extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
Part regionNamePart = null, callbackArgPart = null;
String regionName = null;
Object callbackArg = null;
Part eventPart = null;
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
@@ -63,36 +63,38 @@ public class ClearRegion extends BaseCommand {
stats.incReadClearRegionRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
- eventPart = msg.getPart(1);
+ regionNamePart = clientMessage.getPart(0);
+ eventPart = clientMessage.getPart(1);
// callbackArgPart = null; (redundant assignment)
- if (msg.getNumberOfParts() > 2) {
- callbackArgPart = msg.getPart(2);
+ if (clientMessage.getNumberOfParts() > 2) {
+ callbackArgPart = clientMessage.getPart(2);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
regionName = regionNamePart.getString();
if (logger.isDebugEnabled()) {
- logger.debug(servConn.getName() + ": Received clear region request (" + msg.getPayloadLength()
- + " bytes) from " + servConn.getSocketString() + " for region " + regionName);
+ logger.debug(serverConnection.getName() + ": Received clear region request ("
+ + clientMessage.getPayloadLength() + " bytes) from " + serverConnection.getSocketString()
+ + " for region " + regionName);
}
// Process the clear region request
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ClearRegion_0_THE_INPUT_REGION_NAME_FOR_THE_CLEAR_REGION_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
String errMessage =
LocalizedStrings.ClearRegion_THE_INPUT_REGION_NAME_FOR_THE_CLEAR_REGION_REQUEST_IS_NULL
.toLocalizedString();
- writeErrorResponse(msg, MessageType.CLEAR_REGION_DATA_ERROR, errMessage, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.CLEAR_REGION_DATA_ERROR, errMessage,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -100,35 +102,36 @@ public class ClearRegion extends BaseCommand {
if (region == null) {
String reason = LocalizedStrings.ClearRegion_WAS_NOT_FOUND_DURING_CLEAR_REGION_REGUEST
.toLocalizedString();
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId =
+ new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
try {
// Clear the region
this.securityService.authorizeRegionWrite(regionName);
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
RegionClearOperationContext clearContext =
authzRequest.clearAuthorize(regionName, callbackArg);
callbackArg = clearContext.getCallbackArg();
}
- region.basicBridgeClear(callbackArg, servConn.getProxyID(),
+ region.basicBridgeClear(callbackArg, serverConnection.getProxyID(),
true /* boolean from cache Client */, eventId);
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// If an exception occurs during the clear, preserve the connection
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -138,10 +141,11 @@ public class ClearRegion extends BaseCommand {
start = DistributionStats.getStatTime();
stats.incProcessClearRegionTime(start - oldStart);
}
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug(servConn.getName() + ": Sent clear region response for region " + regionName);
+ logger.debug(
+ serverConnection.getName() + ": Sent clear region response for region " + regionName);
}
stats.incWriteClearRegionResponseTime(DistributionStats.getStatTime() - start);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java
index d50e522..cf9c470 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java
@@ -35,34 +35,36 @@ public class ClientReady extends BaseCommand {
private ClientReady() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CacheServerStats stats = servConn.getCacheServerStats();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
+ throws IOException {
+ CacheServerStats stats = serverConnection.getCacheServerStats();
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadClientReadyRequestTime(start - oldStart);
}
try {
- String clientHost = servConn.getSocketHost();
- int clientPort = servConn.getSocketPort();
+ String clientHost = serverConnection.getSocketHost();
+ int clientPort = serverConnection.getSocketPort();
if (logger.isDebugEnabled()) {
logger.debug("{}: Received client ready request ({} bytes) from {} on {}:{}",
- servConn.getName(), msg.getPayloadLength(), servConn.getProxyID(), clientHost,
- clientPort);
+ serverConnection.getName(), clientMessage.getPayloadLength(),
+ serverConnection.getProxyID(), clientHost, clientPort);
}
- servConn.getAcceptor().getCacheClientNotifier().readyForEvents(servConn.getProxyID());
+ serverConnection.getAcceptor().getCacheClientNotifier()
+ .readyForEvents(serverConnection.getProxyID());
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incProcessClientReadyTime(start - oldStart);
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug(servConn.getName() + ": Processed client ready request from "
- + servConn.getProxyID() + " on " + clientHost + ":" + clientPort);
+ logger.debug(serverConnection.getName() + ": Processed client ready request from "
+ + serverConnection.getProxyID() + " on " + clientHost + ":" + clientPort);
}
} finally {
stats.incWriteClientReadyResponseTime(DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java
index 66045aa..21f0cad 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java
@@ -39,43 +39,44 @@ public class CloseConnection extends BaseCommand {
private CloseConnection() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CacheServerStats stats = servConn.getCacheServerStats();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
+ throws IOException {
+ CacheServerStats stats = serverConnection.getCacheServerStats();
long oldStart = start;
- boolean respondToClient = servConn.getClientVersion().compareTo(Version.GFE_90) >= 0;
+ boolean respondToClient = serverConnection.getClientVersion().compareTo(Version.GFE_90) >= 0;
start = DistributionStats.getStatTime();
stats.incReadCloseConnectionRequestTime(start - oldStart);
if (respondToClient) {
// newer clients will wait for a response or EOFException
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
}
try {
- servConn.setClientDisconnectCleanly();
- String clientHost = servConn.getSocketHost();
- int clientPort = servConn.getSocketPort();
+ serverConnection.setClientDisconnectCleanly();
+ String clientHost = serverConnection.getSocketHost();
+ int clientPort = serverConnection.getSocketPort();
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received close request ({} bytes) from {}:{}", servConn.getName(),
- msg.getPayloadLength(), clientHost, clientPort);
+ logger.debug("{}: Received close request ({} bytes) from {}:{}", serverConnection.getName(),
+ clientMessage.getPayloadLength(), clientHost, clientPort);
}
- Part keepalivePart = msg.getPart(0);
+ Part keepalivePart = clientMessage.getPart(0);
byte[] keepaliveByte = keepalivePart.getSerializedForm();
boolean keepalive = (keepaliveByte == null || keepaliveByte[0] == 0) ? false : true;
- servConn.getAcceptor().getCacheClientNotifier().setKeepAlive(servConn.getProxyID(),
- keepalive);
+ serverConnection.getAcceptor().getCacheClientNotifier()
+ .setKeepAlive(serverConnection.getProxyID(), keepalive);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Processed close request from {}:{}, keepAlive: {}", servConn.getName(),
- clientHost, clientPort, keepalive);
+ logger.debug("{}: Processed close request from {}:{}, keepAlive: {}",
+ serverConnection.getName(), clientHost, clientPort, keepalive);
}
} finally {
if (respondToClient) {
- writeReply(msg, servConn);
+ writeReply(clientMessage, serverConnection);
}
- servConn.setFlagProcessMessagesAsFalse();
+ serverConnection.setFlagProcessMessagesAsFalse();
stats.incProcessCloseConnectionTime(DistributionStats.getStatTime() - start);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index 55ef09b..366d77c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -50,12 +50,13 @@ public class CommitCommand extends BaseCommand {
private CommitCommand() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
- TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
+ throws IOException {
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
InternalDistributedMember client =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
- int uniqId = msg.getTransactionId();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
+ int uniqId = clientMessage.getTransactionId();
TXId txId = new TXId(client, uniqId);
TXCommitMessage commitMsg = null;
if (txMgr.isHostedTxRecentlyCompleted(txId)) {
@@ -64,11 +65,11 @@ public class CommitCommand extends BaseCommand {
logger.debug("TX: returning a recently committed txMessage for tx: {}", txId);
}
if (!txMgr.isExceptionToken(commitMsg)) {
- writeCommitResponse(commitMsg, msg, servConn);
+ writeCommitResponse(commitMsg, clientMessage, serverConnection);
commitMsg.setClientVersion(null); // fixes bug 46529
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} else {
- sendException(msg, servConn, txMgr.getExceptionForToken(commitMsg, txId));
+ sendException(clientMessage, serverConnection, txMgr.getExceptionForToken(commitMsg, txId));
}
txMgr.removeHostedTXState(txId);
return;
@@ -87,10 +88,10 @@ public class CommitCommand extends BaseCommand {
txMgr.commit();
commitMsg = txProxy.getCommitMessage();
- writeCommitResponse(commitMsg, msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeCommitResponse(commitMsg, clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} catch (Exception e) {
- sendException(msg, servConn, e);
+ sendException(clientMessage, serverConnection, e);
} finally {
if (txId != null) {
txMgr.removeHostedTXState(txId);
@@ -115,7 +116,7 @@ public class CommitCommand extends BaseCommand {
if (response != null) {
response.setClientVersion(servConn.getClientVersion());
}
- responseMsg.addObjPart(response, zipValues);
+ responseMsg.addObjPart(response, false);
servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
if (logger.isDebugEnabled()) {
logger.debug("TX: sending a nonNull response for transaction: {}",
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java
index c1b67e1..9cb2528 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java
@@ -51,34 +51,36 @@ public class ContainsKey extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
+ throws IOException {
Part regionNamePart = null;
Part keyPart = null;
String regionName = null;
Object key = null;
- CacheServerStats stats = servConn.getCacheServerStats();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadContainsKeyRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
- keyPart = msg.getPart(1);
+ regionNamePart = clientMessage.getPart(0);
+ keyPart = clientMessage.getPart(1);
regionName = regionNamePart.getString();
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Received containsKey request ({} bytes) from {} for region {} key {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key);
+ serverConnection.getName(), clientMessage.getPayloadLength(),
+ serverConnection.getSocketString(), regionName, key);
}
// Process the containsKey request
@@ -87,47 +89,48 @@ public class ContainsKey extends BaseCommand {
if (key == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ContainsKey_0_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage = LocalizedStrings.ContainsKey_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL
.toLocalizedString();
}
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ContainsKey_0_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage =
LocalizedStrings.ContainsKey_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL
.toLocalizedString();
}
- writeErrorResponse(msg, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason =
LocalizedStrings.ContainsKey_WAS_NOT_FOUND_DURING_CONTAINSKEY_REQUEST.toLocalizedString();
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
try {
this.securityService.authorizeRegionRead(regionName, key.toString());
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
try {
authzRequest.containsKeyAuthorize(regionName, key);
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -140,10 +143,10 @@ public class ContainsKey extends BaseCommand {
start = DistributionStats.getStatTime();
stats.incProcessContainsKeyTime(start - oldStart);
}
- writeContainsKeyResponse(containsKey, msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeContainsKeyResponse(containsKey, clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent containsKey response for region {} key {}", servConn.getName(),
+ logger.debug("{}: Sent containsKey response for region {} key {}", serverConnection.getName(),
regionName, key);
}
stats.incWriteContainsKeyResponseTime(DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java
index dc8f9eb..b2ce055 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java
@@ -55,34 +55,36 @@ public class ContainsKey66 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
+ throws IOException {
Part regionNamePart = null, keyPart = null;
String regionName = null;
Object key = null;
ContainsKeyOp.MODE mode;
- CacheServerStats stats = servConn.getCacheServerStats();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadContainsKeyRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
- keyPart = msg.getPart(1);
- mode = ContainsKeyOp.MODE.values()[(msg.getPart(2).getInt())];
+ regionNamePart = clientMessage.getPart(0);
+ keyPart = clientMessage.getPart(1);
+ mode = ContainsKeyOp.MODE.values()[(clientMessage.getPart(2).getInt())];
regionName = regionNamePart.getString();
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Received containsKey request ({} bytes) from {} for region {} key {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key);
+ serverConnection.getName(), clientMessage.getPayloadLength(),
+ serverConnection.getSocketString(), regionName, key);
}
// Process the containsKey request
@@ -91,46 +93,47 @@ public class ContainsKey66 extends BaseCommand {
if (key == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ContainsKey_0_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage = LocalizedStrings.ContainsKey_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL
.toLocalizedString();
}
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ContainsKey_0_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage =
LocalizedStrings.ContainsKey_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL
.toLocalizedString();
}
- writeErrorResponse(msg, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage,
+ serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason =
LocalizedStrings.ContainsKey_WAS_NOT_FOUND_DURING_CONTAINSKEY_REQUEST.toLocalizedString();
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
try {
this.securityService.authorizeRegionRead(regionName, key.toString());
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
try {
authzRequest.containsKeyAuthorize(regionName, key);
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -157,10 +160,10 @@ public class ContainsKey66 extends BaseCommand {
start = DistributionStats.getStatTime();
stats.incProcessContainsKeyTime(start - oldStart);
}
- writeContainsKeyResponse(containsKey, msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeContainsKeyResponse(containsKey, clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent containsKey response for region {} key {}", servConn.getName(),
+ logger.debug("{}: Sent containsKey response for region {} key {}", serverConnection.getName(),
regionName, key);
}
stats.incWriteContainsKeyResponseTime(DistributionStats.getStatTime() - start);