You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/31 20:51:09 UTC
[24/47] geode git commit: Cleanup BaseCommand
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 4bd4970..5631184 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -17,9 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.distributed.ConfigurationProperties.*;
import java.io.BufferedOutputStream;
-import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Method;
@@ -70,12 +68,7 @@ import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DM;
import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.distributed.internal.MessageWithReply;
-import org.apache.geode.distributed.internal.ReplyMessage;
-import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.internal.ClassLoadUtil;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.statistics.DummyStatisticsFactory;
@@ -127,6 +120,22 @@ import org.apache.geode.security.AuthenticationRequiredException;
public class CacheClientNotifier {
private static final Logger logger = LogService.getLogger();
+ /**
+ * The size of the server-to-client communication socket buffers. This can be modified using the
+ * BridgeServer.SOCKET_BUFFER_SIZE system property.
+ */
+ private static final int socketBufferSize =
+ Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768);
+
+ private static final long CLIENT_PING_TASK_PERIOD =
+ Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
+
+ /**
+ * package-private to avoid synthetic accessor
+ */
+ static final long CLIENT_PING_TASK_COUNTER =
+ Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
+
private static volatile CacheClientNotifier ccnSingleton;
/**
@@ -149,20 +158,6 @@ public class CacheClientNotifier {
private final Set<ClientProxyMembershipID> timedOutDurableClientProxies = new HashSet<>();
- /**
- * The GemFire {@code InternalCache}. Note that since this is a singleton class you should not use
- * a direct reference to cache in CacheClientNotifier code. Instead, you should always use
- * {@code getCache()}
- */
- private InternalCache cache; // TODO: fix synchronization of cache
-
- private InternalLogWriter logWriter;
-
- /**
- * The GemFire security {@code LogWriter}
- */
- private InternalLogWriter securityLogWriter;
-
/** the maximum number of messages that can be enqueued in a client-queue. */
private final int maximumMessageCount;
@@ -179,24 +174,9 @@ public class CacheClientNotifier {
private final CacheServerStats acceptorStats;
/**
- * haContainer can hold either the name of the client-messages-region (in case of eviction
- * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
- * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
- */
- private volatile HAContainerWrapper haContainer;
-
- /**
- * The size of the server-to-client communication socket buffers. This can be modified using the
- * BridgeServer.SOCKET_BUFFER_SIZE system property.
- */
- private static final int socketBufferSize =
- Integer.getInteger("BridgeServer.SOCKET_BUFFER_SIZE", 32768);
-
- /**
* The statistics for this notifier
*/
- final CacheClientNotifierStats statistics; // TODO: pass statistics into CacheClientProxy then
- // make private
+ final CacheClientNotifierStats statistics; // TODO: pass statistics into CacheClientProxy
/**
* The {@code InterestRegistrationListener} instances registered in this VM. This is used when
@@ -209,55 +189,41 @@ public class CacheClientNotifier {
* provide a read-only {@code Set} of listeners.
*/
private final Set readableInterestRegistrationListeners =
- Collections.unmodifiableSet(this.writableInterestRegistrationListeners);
-
- /**
- * System property name for indicating how much frequently the "Queue full" message should be
- * logged.
- */
- private static final String MAX_QUEUE_LOG_FREQUENCY =
- DistributionConfig.GEMFIRE_PREFIX + "logFrequency.clientQueueReachedMaxLimit";
+ Collections.unmodifiableSet(this.writableInterestRegistrationListeners);
- public static final long DEFAULT_LOG_FREQUENCY = 1000;
+ private final Map<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<>();
- private static final String EVENT_ENQUEUE_WAIT_TIME_NAME =
- DistributionConfig.GEMFIRE_PREFIX + "subscription.EVENT_ENQUEUE_WAIT_TIME";
+ private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
- private static final int DEFAULT_EVENT_ENQUEUE_WAIT_TIME = 100;
+ private final SocketCloser socketCloser;
- /**
- * System property value denoting the time in milliseconds. Any thread putting an event into a
- * subscription queue, which is full, will wait this much time for the queue to make space. It'll
- * then enque the event possibly causing the queue to grow beyond its capacity/max-size. See
- * #51400.
- */
- public static int eventEnqueueWaitTime; // TODO: encapsulate eventEnqueueWaitTime
+ /** package-private to avoid synthetic accessor */
+ final Set blackListedClients = new CopyOnWriteArraySet();
/**
- * The frequency of logging the "Queue full" message.
+ * haContainer can hold either the name of the client-messages-region (in case of eviction
+ * policies "mem" or "entry") or an instance of HashMap (in case of eviction policy "none"). In
+ * both the cases, it'll store HAEventWrapper as its key and ClientUpdateMessage as its value.
*/
- private long logFrequency = DEFAULT_LOG_FREQUENCY;
-
- private final Map<String, DefaultQuery> compiledQueries = new ConcurrentHashMap<>();
+ private volatile HAContainerWrapper haContainer;
private volatile boolean isCompiledQueryCleanupThreadStarted = false;
- private final Object lockIsCompiledQueryCleanupThreadStarted = new Object();
-
- private SystemTimer.SystemTimerTask clientPingTask; // TODO: fix synchronization of clientPingTask
-
- private final SocketCloser socketCloser;
+ /**
+ * The GemFire {@code InternalCache}. Note that since this is a singleton class you should not use
+ * a direct reference to cache in CacheClientNotifier code. Instead, you should always use
+ * {@code getCache()}
+ */
+ private InternalCache cache; // TODO: fix synchronization of cache
- private static final long CLIENT_PING_TASK_PERIOD =
- Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingPeriod", 60000);
+ private InternalLogWriter logWriter;
/**
- * package-private to avoid synthetic accessor
+ * The GemFire security {@code LogWriter}
*/
- static final long CLIENT_PING_TASK_COUNTER =
- Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "serverToClientPingCounter", 3);
+ private InternalLogWriter securityLogWriter;
- private final Set blackListedClients = new CopyOnWriteArraySet();
+ private SystemTimer.SystemTimerTask clientPingTask; // TODO: fix synchronization of clientPingTask
/**
* Factory method to construct a CacheClientNotifier {@code CacheClientNotifier} instance.
@@ -319,21 +285,6 @@ public class CacheClientNotifier {
}
this.statistics = new CacheClientNotifierStats(factory);
- try {
- this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY));
- if (this.logFrequency <= 0) {
- this.logFrequency = DEFAULT_LOG_FREQUENCY;
- }
- } catch (Exception e) {
- this.logFrequency = DEFAULT_LOG_FREQUENCY;
- }
-
- eventEnqueueWaitTime =
- Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME);
- if (eventEnqueueWaitTime < 0) {
- eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME;
- }
-
// Schedule task to periodically ping clients.
scheduleClientPingTask();
}
@@ -923,7 +874,7 @@ public class CacheClientNotifier {
* in it that determines which clients will receive the event.
*/
public static void notifyClients(InternalCacheEvent event) {
- CacheClientNotifier instance = ccnSingleton;
+ CacheClientNotifier instance = getInstance();
if (instance != null) {
instance.singletonNotifyClients(event, null);
}
@@ -935,7 +886,7 @@ public class CacheClientNotifier {
*/
public static void notifyClients(InternalCacheEvent event,
ClientUpdateMessage clientUpdateMessage) {
- CacheClientNotifier instance = ccnSingleton;
+ CacheClientNotifier instance = getInstance();
if (instance != null) {
instance.singletonNotifyClients(event, clientUpdateMessage);
}
@@ -1094,7 +1045,7 @@ public class CacheClientNotifier {
* interest established, or override the isClientInterested method to implement its own routing
*/
public static void routeClientMessage(Conflatable clientMessage) {
- CacheClientNotifier instance = ccnSingleton;
+ CacheClientNotifier instance = getInstance();
if (instance != null) {
// ok to use keySet here because all we do is call getClientProxy with these keys
instance.singletonRouteClientMessage(clientMessage, instance.clientProxies.keySet());
@@ -1106,7 +1057,7 @@ public class CacheClientNotifier {
*/
static void routeSingleClientMessage(ClientUpdateMessage clientMessage,
ClientProxyMembershipID clientProxyMembershipId) {
- CacheClientNotifier instance = ccnSingleton;
+ CacheClientNotifier instance = getInstance();
if (instance != null) {
instance.singletonRouteClientMessage(clientMessage,
Collections.singleton(clientProxyMembershipId));
@@ -1589,7 +1540,7 @@ public class CacheClientNotifier {
}
}
- if (noActiveServer() && ccnSingleton != null) {
+ if (noActiveServer() && getInstance() != null) {
ccnSingleton = null;
if (this.haContainer != null) {
this.haContainer.cleanUp();
@@ -1814,7 +1765,7 @@ public class CacheClientNotifier {
/**
* Shuts down durable client proxy
*/
- public boolean closeDurableClientProxy(String durableClientId) throws CacheException {
+ public boolean closeDurableClientProxy(String durableClientId) {
CacheClientProxy ccp = getClientProxy(durableClientId);
if (ccp == null) {
return false;
@@ -1828,8 +1779,7 @@ public class CacheClientNotifier {
if (logger.isDebugEnabled()) {
logger.debug("Cannot close running durable client: {}", durableClientId);
}
- // TODO: never throw an anonymous inner class
- throw new CacheException("Cannot close a running durable client : " + durableClientId) {};
+ throw new IllegalStateException("Cannot close a running durable client : " + durableClientId);
}
}
@@ -2114,10 +2064,6 @@ public class CacheClientNotifier {
CLIENT_PING_TASK_PERIOD, CLIENT_PING_TASK_PERIOD);
}
- public long getLogFrequency() {
- return this.logFrequency;
- }
-
/**
* @return the haContainer
*/
@@ -2182,93 +2128,4 @@ public class CacheClientNotifier {
}
}
- /**
- * Static inner-class ServerInterestRegistrationMessage
- * <p>
- * this message is used to send interest registration to another server. Since interest
- * registration performs a state-flush operation this message must not transmitted on an ordered
- * socket
- */
- public static class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
- implements MessageWithReply {
-
- ClientProxyMembershipID clientId;
- ClientInterestMessageImpl clientMessage;
- int processorId;
-
- ServerInterestRegistrationMessage(ClientProxyMembershipID clientID,
- ClientInterestMessageImpl msg) {
- this.clientId = clientID;
- this.clientMessage = msg;
- }
-
- public ServerInterestRegistrationMessage() {
- // nothing
- }
-
- static void sendInterestChange(DM dm, ClientProxyMembershipID clientID,
- ClientInterestMessageImpl msg) {
- ServerInterestRegistrationMessage registrationMessage =
- new ServerInterestRegistrationMessage(clientID, msg);
- Set recipients = dm.getOtherDistributionManagerIds();
- registrationMessage.setRecipients(recipients);
- ReplyProcessor21 rp = new ReplyProcessor21(dm, recipients);
- registrationMessage.processorId = rp.getProcessorId();
- dm.putOutgoing(registrationMessage);
- try {
- rp.waitForReplies();
- } catch (InterruptedException ignore) {
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- protected void process(DistributionManager dm) {
- // Get the proxy for the proxy id
- try {
- CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
- if (clientNotifier != null) {
- CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId);
- // If this VM contains a proxy for the requested proxy id, forward the
- // message on to the proxy for processing
- if (proxy != null) {
- proxy.processInterestMessage(this.clientMessage);
- }
- }
- } finally {
- ReplyMessage reply = new ReplyMessage();
- reply.setProcessorId(this.processorId);
- reply.setRecipient(getSender());
- try {
- dm.putOutgoing(reply);
- } catch (CancelException ignore) {
- // can't send a reply, so ignore the exception
- }
- }
- }
-
- @Override
- public int getDSFID() {
- return SERVER_INTEREST_REGISTRATION_MESSAGE;
- }
-
- @Override
- public void toData(DataOutput out) throws IOException {
- super.toData(out);
- out.writeInt(this.processorId);
- InternalDataSerializer.invokeToData(this.clientId, out);
- InternalDataSerializer.invokeToData(this.clientMessage, out);
- }
-
- @Override
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
- super.fromData(in);
- this.processorId = in.readInt();
- this.clientId = new ClientProxyMembershipID();
- InternalDataSerializer.invokeFromData(this.clientId, in);
- this.clientMessage = new ClientInterestMessageImpl();
- InternalDataSerializer.invokeFromData(this.clientMessage, in);
- }
- }
}
-
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java
new file mode 100644
index 0000000..5860982
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerInterestRegistrationMessage.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.ReplyProcessor21;
+import org.apache.geode.internal.InternalDataSerializer;
+
+/**
+ * Send interest registration to another server. Since interest registration performs a state-flush operation this message must not transmitted on an ordered socket.
+ * <p>
+ * Extracted from CacheClientNotifier
+ */
+public class ServerInterestRegistrationMessage extends HighPriorityDistributionMessage
+ implements MessageWithReply {
+
+ private ClientProxyMembershipID clientId;
+ private ClientInterestMessageImpl clientMessage;
+ private int processorId;
+
+ ServerInterestRegistrationMessage(ClientProxyMembershipID clientId, ClientInterestMessageImpl clientInterestMessage) {
+ this.clientId = clientId;
+ this.clientMessage = clientInterestMessage;
+ }
+
+ public ServerInterestRegistrationMessage() {
+ // deserializing in fromData
+ }
+
+ static void sendInterestChange(DM dm, ClientProxyMembershipID clientId, ClientInterestMessageImpl clientInterestMessage) {
+ ServerInterestRegistrationMessage registrationMessage =
+ new ServerInterestRegistrationMessage(clientId, clientInterestMessage);
+
+ Set recipients = dm.getOtherDistributionManagerIds();
+ registrationMessage.setRecipients(recipients);
+
+ ReplyProcessor21 replyProcessor = new ReplyProcessor21(dm, recipients);
+ registrationMessage.processorId = replyProcessor.getProcessorId();
+
+ dm.putOutgoing(registrationMessage);
+
+ try {
+ replyProcessor.waitForReplies();
+ } catch (InterruptedException ignore) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ protected void process(DistributionManager dm) {
+ // Get the proxy for the proxy id
+ try {
+ CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance();
+ if (clientNotifier != null) {
+ CacheClientProxy proxy = clientNotifier.getClientProxy(this.clientId);
+ // If this VM contains a proxy for the requested proxy id, forward the
+ // message on to the proxy for processing
+ if (proxy != null) {
+ proxy.processInterestMessage(this.clientMessage);
+ }
+ }
+ } finally {
+ ReplyMessage reply = new ReplyMessage();
+ reply.setProcessorId(this.processorId);
+ reply.setRecipient(getSender());
+ try {
+ dm.putOutgoing(reply);
+ } catch (CancelException ignore) {
+ // can't send a reply, so ignore the exception
+ }
+ }
+ }
+
+ @Override
+ public int getDSFID() {
+ return SERVER_INTEREST_REGISTRATION_MESSAGE;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ out.writeInt(this.processorId);
+ InternalDataSerializer.invokeToData(this.clientId, out);
+ InternalDataSerializer.invokeToData(this.clientMessage, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.processorId = in.readInt();
+ this.clientId = new ClientProxyMembershipID();
+ InternalDataSerializer.invokeFromData(this.clientId, in);
+ this.clientMessage = new ClientInterestMessageImpl();
+ InternalDataSerializer.invokeFromData(this.clientMessage, in);
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
index 1b599e9..fb0bd50 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxEnum.java
@@ -39,29 +39,29 @@ public class AddPdxEnum extends BaseCommand {
private AddPdxEnum() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
if (logger.isDebugEnabled()) {
logger.debug("{}: Received get pdx id for enum request ({} parts) from {}",
- servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString());
+ serverConnection.getName(), clientMessage.getNumberOfParts(), serverConnection.getSocketString());
}
- int noOfParts = msg.getNumberOfParts();
+ int noOfParts = clientMessage.getNumberOfParts();
- EnumInfo enumInfo = (EnumInfo) msg.getPart(0).getObject();
- int enumId = msg.getPart(1).getInt();
+ EnumInfo enumInfo = (EnumInfo) clientMessage.getPart(0).getObject();
+ int enumId = clientMessage.getPart(1).getInt();
try {
- InternalCache cache = servConn.getCache();
+ InternalCache cache = serverConnection.getCache();
TypeRegistry registry = cache.getPdxRegistry();
registry.addRemoteEnum(enumId, enumInfo);
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
index 9b8302e..10a065c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AddPdxType.java
@@ -39,33 +39,33 @@ public class AddPdxType extends BaseCommand {
private AddPdxType() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, ClassNotFoundException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
if (logger.isDebugEnabled()) {
logger.debug("{}: Received get pdx id for type request ({} parts) from {}",
- servConn.getName(), msg.getNumberOfParts(), servConn.getSocketString());
+ serverConnection.getName(), clientMessage.getNumberOfParts(), serverConnection.getSocketString());
}
- int noOfParts = msg.getNumberOfParts();
+ int noOfParts = clientMessage.getNumberOfParts();
- PdxType type = (PdxType) msg.getPart(0).getObject();
- int typeId = msg.getPart(1).getInt();
+ PdxType type = (PdxType) clientMessage.getPart(0).getObject();
+ int typeId = clientMessage.getPart(1).getInt();
// The native client needs this line
// because it doesn't set the type id on the
// client side.
type.setTypeId(typeId);
try {
- InternalCache cache = servConn.getCache();
+ InternalCache cache = serverConnection.getCache();
TypeRegistry registry = cache.getPdxRegistry();
registry.addRemoteType(typeId, type);
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java
index 959430c..c9c5a9d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClearRegion.java
@@ -47,15 +47,15 @@ public class ClearRegion extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
Part regionNamePart = null, callbackArgPart = null;
String regionName = null;
Object callbackArg = null;
Part eventPart = null;
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
@@ -63,36 +63,36 @@ public class ClearRegion extends BaseCommand {
stats.incReadClearRegionRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
- eventPart = msg.getPart(1);
+ regionNamePart = clientMessage.getPart(0);
+ eventPart = clientMessage.getPart(1);
// callbackArgPart = null; (redundant assignment)
- if (msg.getNumberOfParts() > 2) {
- callbackArgPart = msg.getPart(2);
+ if (clientMessage.getNumberOfParts() > 2) {
+ callbackArgPart = clientMessage.getPart(2);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
regionName = regionNamePart.getString();
if (logger.isDebugEnabled()) {
- logger.debug(servConn.getName() + ": Received clear region request (" + msg.getPayloadLength()
- + " bytes) from " + servConn.getSocketString() + " for region " + regionName);
+ logger.debug(serverConnection.getName() + ": Received clear region request (" + clientMessage.getPayloadLength()
+ + " bytes) from " + serverConnection.getSocketString() + " for region " + regionName);
}
// Process the clear region request
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ClearRegion_0_THE_INPUT_REGION_NAME_FOR_THE_CLEAR_REGION_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
String errMessage =
LocalizedStrings.ClearRegion_THE_INPUT_REGION_NAME_FOR_THE_CLEAR_REGION_REQUEST_IS_NULL
.toLocalizedString();
- writeErrorResponse(msg, MessageType.CLEAR_REGION_DATA_ERROR, errMessage, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.CLEAR_REGION_DATA_ERROR, errMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -100,35 +100,35 @@ public class ClearRegion extends BaseCommand {
if (region == null) {
String reason = LocalizedStrings.ClearRegion_WAS_NOT_FOUND_DURING_CLEAR_REGION_REGUEST
.toLocalizedString();
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
try {
// Clear the region
this.securityService.authorizeRegionWrite(regionName);
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
RegionClearOperationContext clearContext =
authzRequest.clearAuthorize(regionName, callbackArg);
callbackArg = clearContext.getCallbackArg();
}
- region.basicBridgeClear(callbackArg, servConn.getProxyID(),
+ region.basicBridgeClear(callbackArg, serverConnection.getProxyID(),
true /* boolean from cache Client */, eventId);
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// If an exception occurs during the clear, preserve the connection
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -138,10 +138,10 @@ public class ClearRegion extends BaseCommand {
start = DistributionStats.getStatTime();
stats.incProcessClearRegionTime(start - oldStart);
}
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug(servConn.getName() + ": Sent clear region response for region " + regionName);
+ logger.debug(serverConnection.getName() + ": Sent clear region response for region " + regionName);
}
stats.incWriteClearRegionResponseTime(DistributionStats.getStatTime() - start);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java
index d50e522..053ef8a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ClientReady.java
@@ -35,34 +35,34 @@ public class ClientReady extends BaseCommand {
private ClientReady() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CacheServerStats stats = servConn.getCacheServerStats();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
+ CacheServerStats stats = serverConnection.getCacheServerStats();
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadClientReadyRequestTime(start - oldStart);
}
try {
- String clientHost = servConn.getSocketHost();
- int clientPort = servConn.getSocketPort();
+ String clientHost = serverConnection.getSocketHost();
+ int clientPort = serverConnection.getSocketPort();
if (logger.isDebugEnabled()) {
logger.debug("{}: Received client ready request ({} bytes) from {} on {}:{}",
- servConn.getName(), msg.getPayloadLength(), servConn.getProxyID(), clientHost,
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getProxyID(), clientHost,
clientPort);
}
- servConn.getAcceptor().getCacheClientNotifier().readyForEvents(servConn.getProxyID());
+ serverConnection.getAcceptor().getCacheClientNotifier().readyForEvents(serverConnection.getProxyID());
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incProcessClientReadyTime(start - oldStart);
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug(servConn.getName() + ": Processed client ready request from "
- + servConn.getProxyID() + " on " + clientHost + ":" + clientPort);
+ logger.debug(serverConnection.getName() + ": Processed client ready request from "
+ + serverConnection.getProxyID() + " on " + clientHost + ":" + clientPort);
}
} finally {
stats.incWriteClientReadyResponseTime(DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java
index 66045aa..378a322 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CloseConnection.java
@@ -39,43 +39,43 @@ public class CloseConnection extends BaseCommand {
private CloseConnection() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- CacheServerStats stats = servConn.getCacheServerStats();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
+ CacheServerStats stats = serverConnection.getCacheServerStats();
long oldStart = start;
- boolean respondToClient = servConn.getClientVersion().compareTo(Version.GFE_90) >= 0;
+ boolean respondToClient = serverConnection.getClientVersion().compareTo(Version.GFE_90) >= 0;
start = DistributionStats.getStatTime();
stats.incReadCloseConnectionRequestTime(start - oldStart);
if (respondToClient) {
// newer clients will wait for a response or EOFException
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
}
try {
- servConn.setClientDisconnectCleanly();
- String clientHost = servConn.getSocketHost();
- int clientPort = servConn.getSocketPort();
+ serverConnection.setClientDisconnectCleanly();
+ String clientHost = serverConnection.getSocketHost();
+ int clientPort = serverConnection.getSocketPort();
if (logger.isDebugEnabled()) {
- logger.debug("{}: Received close request ({} bytes) from {}:{}", servConn.getName(),
- msg.getPayloadLength(), clientHost, clientPort);
+ logger.debug("{}: Received close request ({} bytes) from {}:{}", serverConnection.getName(),
+ clientMessage.getPayloadLength(), clientHost, clientPort);
}
- Part keepalivePart = msg.getPart(0);
+ Part keepalivePart = clientMessage.getPart(0);
byte[] keepaliveByte = keepalivePart.getSerializedForm();
boolean keepalive = (keepaliveByte == null || keepaliveByte[0] == 0) ? false : true;
- servConn.getAcceptor().getCacheClientNotifier().setKeepAlive(servConn.getProxyID(),
+ serverConnection.getAcceptor().getCacheClientNotifier().setKeepAlive(serverConnection.getProxyID(),
keepalive);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Processed close request from {}:{}, keepAlive: {}", servConn.getName(),
+ logger.debug("{}: Processed close request from {}:{}, keepAlive: {}", serverConnection.getName(),
clientHost, clientPort, keepalive);
}
} finally {
if (respondToClient) {
- writeReply(msg, servConn);
+ writeReply(clientMessage, serverConnection);
}
- servConn.setFlagProcessMessagesAsFalse();
+ serverConnection.setFlagProcessMessagesAsFalse();
stats.incProcessCloseConnectionTime(DistributionStats.getStatTime() - start);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
index 55ef09b..b2bba4f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CommitCommand.java
@@ -50,12 +50,12 @@ public class CommitCommand extends BaseCommand {
private CommitCommand() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
- servConn.setAsTrue(REQUIRES_RESPONSE);
- TXManagerImpl txMgr = (TXManagerImpl) servConn.getCache().getCacheTransactionManager();
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
+ TXManagerImpl txMgr = (TXManagerImpl) serverConnection.getCache().getCacheTransactionManager();
InternalDistributedMember client =
- (InternalDistributedMember) servConn.getProxyID().getDistributedMember();
- int uniqId = msg.getTransactionId();
+ (InternalDistributedMember) serverConnection.getProxyID().getDistributedMember();
+ int uniqId = clientMessage.getTransactionId();
TXId txId = new TXId(client, uniqId);
TXCommitMessage commitMsg = null;
if (txMgr.isHostedTxRecentlyCompleted(txId)) {
@@ -64,11 +64,11 @@ public class CommitCommand extends BaseCommand {
logger.debug("TX: returning a recently committed txMessage for tx: {}", txId);
}
if (!txMgr.isExceptionToken(commitMsg)) {
- writeCommitResponse(commitMsg, msg, servConn);
+ writeCommitResponse(commitMsg, clientMessage, serverConnection);
commitMsg.setClientVersion(null); // fixes bug 46529
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
} else {
- sendException(msg, servConn, txMgr.getExceptionForToken(commitMsg, txId));
+ sendException(clientMessage, serverConnection, txMgr.getExceptionForToken(commitMsg, txId));
}
txMgr.removeHostedTXState(txId);
return;
@@ -87,10 +87,10 @@ public class CommitCommand extends BaseCommand {
txMgr.commit();
commitMsg = txProxy.getCommitMessage();
- writeCommitResponse(commitMsg, msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeCommitResponse(commitMsg, clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
} catch (Exception e) {
- sendException(msg, servConn, e);
+ sendException(clientMessage, serverConnection, e);
} finally {
if (txId != null) {
txMgr.removeHostedTXState(txId);
@@ -115,7 +115,7 @@ public class CommitCommand extends BaseCommand {
if (response != null) {
response.setClientVersion(servConn.getClientVersion());
}
- responseMsg.addObjPart(response, zipValues);
+ responseMsg.addObjPart(response, false);
servConn.getCache().getCancelCriterion().checkCancelInProgress(null);
if (logger.isDebugEnabled()) {
logger.debug("TX: sending a nonNull response for transaction: {}",
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java
index c1b67e1..50d1197 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey.java
@@ -51,34 +51,34 @@ public class ContainsKey extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
Part regionNamePart = null;
Part keyPart = null;
String regionName = null;
Object key = null;
- CacheServerStats stats = servConn.getCacheServerStats();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadContainsKeyRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
- keyPart = msg.getPart(1);
+ regionNamePart = clientMessage.getPart(0);
+ keyPart = clientMessage.getPart(1);
regionName = regionNamePart.getString();
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Received containsKey request ({} bytes) from {} for region {} key {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key);
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key);
}
// Process the containsKey request
@@ -87,47 +87,47 @@ public class ContainsKey extends BaseCommand {
if (key == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ContainsKey_0_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage = LocalizedStrings.ContainsKey_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL
.toLocalizedString();
}
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ContainsKey_0_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage =
LocalizedStrings.ContainsKey_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL
.toLocalizedString();
}
- writeErrorResponse(msg, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason =
LocalizedStrings.ContainsKey_WAS_NOT_FOUND_DURING_CONTAINSKEY_REQUEST.toLocalizedString();
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
try {
this.securityService.authorizeRegionRead(regionName, key.toString());
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
try {
authzRequest.containsKeyAuthorize(regionName, key);
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -140,10 +140,10 @@ public class ContainsKey extends BaseCommand {
start = DistributionStats.getStatTime();
stats.incProcessContainsKeyTime(start - oldStart);
}
- writeContainsKeyResponse(containsKey, msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeContainsKeyResponse(containsKey, clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent containsKey response for region {} key {}", servConn.getName(),
+ logger.debug("{}: Sent containsKey response for region {} key {}", serverConnection.getName(),
regionName, key);
}
stats.incWriteContainsKeyResponseTime(DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java
index dc8f9eb..53bb414 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/ContainsKey66.java
@@ -55,34 +55,34 @@ public class ContainsKey66 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
Part regionNamePart = null, keyPart = null;
String regionName = null;
Object key = null;
ContainsKeyOp.MODE mode;
- CacheServerStats stats = servConn.getCacheServerStats();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
start = DistributionStats.getStatTime();
stats.incReadContainsKeyRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
- keyPart = msg.getPart(1);
- mode = ContainsKeyOp.MODE.values()[(msg.getPart(2).getInt())];
+ regionNamePart = clientMessage.getPart(0);
+ keyPart = clientMessage.getPart(1);
+ mode = ContainsKeyOp.MODE.values()[(clientMessage.getPart(2).getInt())];
regionName = regionNamePart.getString();
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Received containsKey request ({} bytes) from {} for region {} key {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key);
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key);
}
// Process the containsKey request
@@ -91,46 +91,46 @@ public class ContainsKey66 extends BaseCommand {
if (key == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ContainsKey_0_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage = LocalizedStrings.ContainsKey_THE_INPUT_KEY_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL
.toLocalizedString();
}
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.ContainsKey_0_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage =
LocalizedStrings.ContainsKey_THE_INPUT_REGION_NAME_FOR_THE_CONTAINSKEY_REQUEST_IS_NULL
.toLocalizedString();
}
- writeErrorResponse(msg, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.CONTAINS_KEY_DATA_ERROR, errMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason =
LocalizedStrings.ContainsKey_WAS_NOT_FOUND_DURING_CONTAINSKEY_REQUEST.toLocalizedString();
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
try {
this.securityService.authorizeRegionRead(regionName, key.toString());
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
try {
authzRequest.containsKeyAuthorize(regionName, key);
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -157,10 +157,10 @@ public class ContainsKey66 extends BaseCommand {
start = DistributionStats.getStatTime();
stats.incProcessContainsKeyTime(start - oldStart);
}
- writeContainsKeyResponse(containsKey, msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeContainsKeyResponse(containsKey, clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent containsKey response for region {} key {}", servConn.getName(),
+ logger.debug("{}: Sent containsKey response for region {} key {}", serverConnection.getName(),
regionName, key);
}
stats.incWriteContainsKeyResponseTime(DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java
index d84dc62..b7ab01b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/CreateRegion.java
@@ -41,25 +41,25 @@ public class CreateRegion extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
Part regionNamePart = null;
String regionName = null;
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
// bserverStats.incLong(readDestroyRequestTimeId,
// DistributionStats.getStatTime() - start);
// bserverStats.incInt(destroyRequestsId, 1);
// start = DistributionStats.getStatTime();
// Retrieve the data from the message parts
- Part parentRegionNamePart = msg.getPart(0);
+ Part parentRegionNamePart = clientMessage.getPart(0);
String parentRegionName = parentRegionNamePart.getString();
- regionNamePart = msg.getPart(1);
+ regionNamePart = clientMessage.getPart(1);
regionName = regionNamePart.getString();
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Received create region request ({} bytes) from {} for parent region {} region {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), parentRegionName,
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), parentRegionName,
regionName);
}
@@ -69,7 +69,7 @@ public class CreateRegion extends BaseCommand {
if (parentRegionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.CreateRegion_0_THE_INPUT_PARENT_REGION_NAME_FOR_THE_CREATE_REGION_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage =
LocalizedStrings.CreateRegion_THE_INPUT_PARENT_REGION_NAME_FOR_THE_CREATE_REGION_REQUEST_IS_NULL
.toLocalizedString();
@@ -77,41 +77,41 @@ public class CreateRegion extends BaseCommand {
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.CreateRegion_0_THE_INPUT_REGION_NAME_FOR_THE_CREATE_REGION_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage =
LocalizedStrings.CreateRegion_THE_INPUT_REGION_NAME_FOR_THE_CREATE_REGION_REQUEST_IS_NULL
.toLocalizedString();
}
- writeErrorResponse(msg, MessageType.CREATE_REGION_DATA_ERROR, errMessage, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.CREATE_REGION_DATA_ERROR, errMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- Region parentRegion = servConn.getCache().getRegion(parentRegionName);
+ Region parentRegion = serverConnection.getCache().getRegion(parentRegionName);
if (parentRegion == null) {
String reason =
LocalizedStrings.CreateRegion__0_WAS_NOT_FOUND_DURING_SUBREGION_CREATION_REQUEST
.toLocalizedString(parentRegionName);
- writeRegionDestroyedEx(msg, parentRegionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, parentRegionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
try {
this.securityService.authorizeDataManage();
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
try {
authzRequest.createRegionAuthorize(parentRegionName + '/' + regionName);
} catch (NotAuthorizedException ex) {
- writeException(msg, ex, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, ex, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -121,11 +121,11 @@ public class CreateRegion extends BaseCommand {
AttributesFactory factory = new AttributesFactory(parentRegion.getAttributes());
region = parentRegion.createSubregion(regionName, factory.create());
if (logger.isDebugEnabled()) {
- logger.debug("{}: Created region {}", servConn.getName(), region);
+ logger.debug("{}: Created region {}", serverConnection.getName(), region);
}
} else {
if (logger.isDebugEnabled()) {
- logger.debug("{}: Retrieved region {}", servConn.getName(), region);
+ logger.debug("{}: Retrieved region {}", serverConnection.getName(), region);
}
}
@@ -134,11 +134,11 @@ public class CreateRegion extends BaseCommand {
// NOT USING IT
// bserverStats.incLong(processDestroyTimeId,
// DistributionStats.getStatTime() - start);
- writeReply(msg, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeReply(clientMessage, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
logger.debug("{}: Sent create region response for parent region {} region {}",
- servConn.getName(), parentRegionName, regionName);
+ serverConnection.getName(), parentRegionName, regionName);
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java
index 1497044..359e1b4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Default.java
@@ -37,14 +37,15 @@ public class Default extends BaseCommand {
private Default() {}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start) throws IOException {
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start) throws IOException {
// requiresResponse = true; NOT NEEDED... ALWAYS SEND ERROR RESPONSE
logger.fatal(
LocalizedMessage.create(LocalizedStrings.Default_0_UNKNOWN_MESSAGE_TYPE_1_WITH_TX_2_FROM_3,
- new Object[] {servConn.getName(), MessageType.getString(msg.getMessageType()),
- Integer.valueOf(msg.getTransactionId()), servConn.getSocketString()}));
- writeErrorResponse(msg, MessageType.UNKNOWN_MESSAGE_TYPE_ERROR, servConn);
+ new Object[] {
+ serverConnection.getName(), MessageType.getString(clientMessage.getMessageType()),
+ Integer.valueOf(clientMessage.getTransactionId()), serverConnection.getSocketString()}));
+ writeErrorResponse(clientMessage, MessageType.UNKNOWN_MESSAGE_TYPE_ERROR, serverConnection);
// responded = true; NOT NEEDED... ALWAYS SEND ERROR RESPONSE
}
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java
index 5996984..0699c8b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy.java
@@ -48,7 +48,7 @@ public class Destroy extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long startparam)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long startparam)
throws IOException, InterruptedException {
long start = startparam;
@@ -57,8 +57,8 @@ public class Destroy extends BaseCommand {
Object callbackArg = null, key = null;
Part eventPart = null;
StringBuffer errMessage = new StringBuffer();
- CacheServerStats stats = servConn.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ CacheServerStats stats = serverConnection.getCacheServerStats();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
{
long oldStart = start;
@@ -66,17 +66,17 @@ public class Destroy extends BaseCommand {
stats.incReadDestroyRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
- keyPart = msg.getPart(1);
- eventPart = msg.getPart(2);
+ regionNamePart = clientMessage.getPart(0);
+ keyPart = clientMessage.getPart(1);
+ eventPart = clientMessage.getPart(2);
// callbackArgPart = null; (redundant assignment)
- if (msg.getNumberOfParts() > 3) {
- callbackArgPart = msg.getPart(3);
+ if (clientMessage.getNumberOfParts() > 3) {
+ callbackArgPart = clientMessage.getPart(3);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -84,13 +84,13 @@ public class Destroy extends BaseCommand {
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug("{}: Received destroy request ({} bytes) from {} for region {} key {}",
- servConn.getName(), msg.getPayloadLength(), servConn.getSocketString(), regionName, key);
+ serverConnection.getName(), clientMessage.getPayloadLength(), serverConnection.getSocketString(), regionName, key);
}
// Process the destroy request
@@ -98,29 +98,29 @@ public class Destroy extends BaseCommand {
if (key == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.Destroy_0_THE_INPUT_KEY_FOR_THE_DESTROY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage.append(LocalizedStrings.Destroy__THE_INPUT_KEY_FOR_THE_DESTROY_REQUEST_IS_NULL
.toLocalizedString());
}
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.Destroy_0_THE_INPUT_REGION_NAME_FOR_THE_DESTROY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage
.append(LocalizedStrings.Destroy__THE_INPUT_REGION_NAME_FOR_THE_DESTROY_REQUEST_IS_NULL
.toLocalizedString());
}
- writeErrorResponse(msg, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason = LocalizedStrings.Destroy__0_WAS_NOT_FOUND_DURING_DESTROY_REQUEST
.toLocalizedString(regionName);
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -128,13 +128,13 @@ public class Destroy extends BaseCommand {
ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
try {
// for integrated security
this.securityService.authorizeRegionWrite(regionName, key.toString());
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
RegionDestroyOperationContext destroyContext =
@@ -146,35 +146,35 @@ public class Destroy extends BaseCommand {
callbackArg = destroyContext.getCallbackArg();
}
}
- region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), true,
+ region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), true,
new EventIDHolder(eventId));
- servConn.setModificationInfo(true, regionName, key);
+ serverConnection.setModificationInfo(true, regionName, key);
} catch (EntryNotFoundException e) {
// Don't send an exception back to the client if this
// exception happens. Just log it and continue.
logger.info(LocalizedMessage.create(
LocalizedStrings.Destroy_0_DURING_ENTRY_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
- new Object[] {servConn.getName(), key}));
+ new Object[] { serverConnection.getName(), key}));
} catch (RegionDestroyedException rde) {
- writeException(msg, rde, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, rde, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// If an exception occurs during the destroy, preserve the connection
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (e instanceof GemFireSecurityException) {
// Fine logging for security exceptions since these are already
// logged by the security logger
if (logger.isDebugEnabled()) {
- logger.debug("{}: Unexpected Security exception", servConn.getName(), e);
+ logger.debug("{}: Unexpected Security exception", serverConnection.getName(), e);
}
} else {
logger.warn(LocalizedMessage.create(LocalizedStrings.Destroy_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), e);
+ serverConnection.getName()), e);
}
return;
}
@@ -188,17 +188,17 @@ public class Destroy extends BaseCommand {
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
- writeReplyWithRefreshMetadata(msg, servConn, pr, pr.getNetworkHopType());
+ writeReplyWithRefreshMetadata(clientMessage, serverConnection, pr, pr.getNetworkHopType());
pr.clearNetworkHopData();
} else {
- writeReply(msg, servConn);
+ writeReply(clientMessage, serverConnection);
}
} else {
- writeReply(msg, servConn);
+ writeReply(clientMessage, serverConnection);
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent destroy response for region {} key {}", servConn.getName(), regionName,
+ logger.debug("{}: Sent destroy response for region {} key {}", serverConnection.getName(), regionName,
key);
}
stats.incWriteDestroyResponseTime(DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java
index 585f57d..0ee0fc4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java
@@ -54,7 +54,7 @@ public class Destroy65 extends BaseCommand {
}
@Override
- protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection servConn,
+ protected void writeReplyWithRefreshMetadata(Message origMsg, ServerConnection serverConnection,
PartitionedRegion pr, byte nwHop) throws IOException {
throw new UnsupportedOperationException();
}
@@ -72,7 +72,7 @@ public class Destroy65 extends BaseCommand {
replyMsg.addIntPart(entryNotFoundForRemove ? 1 : 0);
replyMsg.send(servConn);
if (logger.isTraceEnabled()) {
- logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(),
+ logger.trace("{}: rpl with REFRESH_METADATA tx: {}", servConn.getName(),
origMsg.getTransactionId());
}
}
@@ -84,7 +84,7 @@ public class Destroy65 extends BaseCommand {
replyMsg.setMessageType(MessageType.REPLY);
replyMsg.setNumberOfParts(2);
replyMsg.setTransactionId(origMsg.getTransactionId());
- replyMsg.addBytesPart(OK_BYTES);
+ replyMsg.addBytesPart(okBytes());
replyMsg.addIntPart(entryNotFound ? 1 : 0);
replyMsg.send(servConn);
if (logger.isTraceEnabled()) {
@@ -94,7 +94,7 @@ public class Destroy65 extends BaseCommand {
}
@Override
- public void cmdExecute(Message msg, ServerConnection servConn, long start)
+ public void cmdExecute(Message clientMessage, ServerConnection serverConnection, long start)
throws IOException, InterruptedException {
Part regionNamePart;
Part keyPart;
@@ -108,20 +108,20 @@ public class Destroy65 extends BaseCommand {
String regionName = null;
Object callbackArg = null, key = null;
StringBuffer errMessage = new StringBuffer();
- CachedRegionHelper crHelper = servConn.getCachedRegionHelper();
- CacheServerStats stats = servConn.getCacheServerStats();
- servConn.setAsTrue(REQUIRES_RESPONSE);
+ CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
+ CacheServerStats stats = serverConnection.getCacheServerStats();
+ serverConnection.setAsTrue(REQUIRES_RESPONSE);
long now = DistributionStats.getStatTime();
stats.incReadDestroyRequestTime(now - start);
// Retrieve the data from the message parts
- regionNamePart = msg.getPart(0);
- keyPart = msg.getPart(1);
- expectedOldValuePart = msg.getPart(2);
+ regionNamePart = clientMessage.getPart(0);
+ keyPart = clientMessage.getPart(1);
+ expectedOldValuePart = clientMessage.getPart(2);
try {
- operation = msg.getPart(3).getObject();
+ operation = clientMessage.getPart(3).getObject();
if (((operation instanceof Operation) && ((Operation) operation == Operation.REMOVE))
|| ((operation instanceof Byte) && (Byte) operation == OpType.DESTROY))
@@ -130,20 +130,20 @@ public class Destroy65 extends BaseCommand {
expectedOldValue = expectedOldValuePart.getObject();
}
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- eventPart = msg.getPart(4);
+ eventPart = clientMessage.getPart(4);
- if (msg.getNumberOfParts() > 5) {
- callbackArgPart = msg.getPart(5);
+ if (clientMessage.getNumberOfParts() > 5) {
+ callbackArgPart = clientMessage.getPart(5);
try {
callbackArg = callbackArgPart.getObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
}
@@ -151,16 +151,16 @@ public class Destroy65 extends BaseCommand {
try {
key = keyPart.getStringOrObject();
} catch (Exception e) {
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
if (logger.isDebugEnabled()) {
logger.debug(
"{}: Received destroy65 request ({} bytes; op={}) from {} for region {} key {}{} txId {}",
- servConn.getName(), msg.getPayloadLength(), operation, servConn.getSocketString(),
+ serverConnection.getName(), clientMessage.getPayloadLength(), operation, serverConnection.getSocketString(),
regionName, key, (operation == Operation.REMOVE ? " value=" + expectedOldValue : ""),
- msg.getTransactionId());
+ clientMessage.getTransactionId());
}
boolean entryNotFoundForRemove = false;
@@ -169,29 +169,29 @@ public class Destroy65 extends BaseCommand {
if (key == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.Destroy_0_THE_INPUT_KEY_FOR_THE_DESTROY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage.append(LocalizedStrings.Destroy__THE_INPUT_KEY_FOR_THE_DESTROY_REQUEST_IS_NULL
.toLocalizedString());
}
if (regionName == null) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.Destroy_0_THE_INPUT_REGION_NAME_FOR_THE_DESTROY_REQUEST_IS_NULL,
- servConn.getName()));
+ serverConnection.getName()));
errMessage
.append(LocalizedStrings.Destroy__THE_INPUT_REGION_NAME_FOR_THE_DESTROY_REQUEST_IS_NULL
.toLocalizedString());
}
- writeErrorResponse(msg, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), servConn);
- servConn.setAsTrue(RESPONDED);
+ writeErrorResponse(clientMessage, MessageType.DESTROY_DATA_ERROR, errMessage.toString(), serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
- LocalRegion region = (LocalRegion) servConn.getCache().getRegion(regionName);
+ LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
if (region == null) {
String reason = LocalizedStrings.Destroy__0_WAS_NOT_FOUND_DURING_DESTROY_REQUEST
.toLocalizedString(regionName);
- writeRegionDestroyedEx(msg, regionName, reason, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
}
@@ -199,13 +199,13 @@ public class Destroy65 extends BaseCommand {
ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
- EventID eventId = new EventID(servConn.getEventMemberIDByteArray(), threadId, sequenceId);
+ EventID eventId = new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
EventIDHolder clientEvent = new EventIDHolder(eventId);
Breadcrumbs.setEventId(eventId);
// msg.isRetry might be set by v7.0 and later clients
- if (msg.isRetry()) {
+ if (clientMessage.isRetry()) {
// if (logger.isDebugEnabled()) {
// logger.debug("DEBUG: encountered isRetry in Destroy65");
// }
@@ -223,7 +223,7 @@ public class Destroy65 extends BaseCommand {
// for integrated security
this.securityService.authorizeRegionWrite(regionName, key.toString());
- AuthorizeRequest authzRequest = servConn.getAuthzRequest();
+ AuthorizeRequest authzRequest = serverConnection.getAuthzRequest();
if (authzRequest != null) {
if (DynamicRegionFactory.regionIsDynamicRegionList(regionName)) {
RegionDestroyOperationContext destroyContext =
@@ -236,14 +236,14 @@ public class Destroy65 extends BaseCommand {
}
}
if (operation == null || operation == Operation.DESTROY) {
- region.basicBridgeDestroy(key, callbackArg, servConn.getProxyID(), true, clientEvent);
+ region.basicBridgeDestroy(key, callbackArg, serverConnection.getProxyID(), true, clientEvent);
} else {
// this throws exceptions if expectedOldValue checks fail
try {
if (expectedOldValue == null) {
expectedOldValue = Token.INVALID;
}
- if (operation == Operation.REMOVE && msg.isRetry()
+ if (operation == Operation.REMOVE && clientMessage.isRetry()
&& clientEvent.getVersionTag() != null) {
// the operation was successful last time it was tried, so there's
// no need to perform it again. Just return the version tag and
@@ -254,55 +254,55 @@ public class Destroy65 extends BaseCommand {
}
// try the operation anyway to ensure that it's been distributed to all servers
try {
- region.basicBridgeRemove(key, expectedOldValue, callbackArg, servConn.getProxyID(),
+ region.basicBridgeRemove(key, expectedOldValue, callbackArg, serverConnection.getProxyID(),
true, clientEvent);
} catch (EntryNotFoundException e) {
// ignore, and don't set entryNotFoundForRemove because this was a successful
// operation - bug #51664
}
} else {
- region.basicBridgeRemove(key, expectedOldValue, callbackArg, servConn.getProxyID(),
+ region.basicBridgeRemove(key, expectedOldValue, callbackArg, serverConnection.getProxyID(),
true, clientEvent);
if (logger.isDebugEnabled()) {
logger.debug("region.remove succeeded");
}
}
} catch (EntryNotFoundException e) {
- servConn.setModificationInfo(true, regionName, key);
+ serverConnection.setModificationInfo(true, regionName, key);
if (logger.isDebugEnabled()) {
logger.debug("writing entryNotFound response");
}
entryNotFoundForRemove = true;
}
}
- servConn.setModificationInfo(true, regionName, key);
+ serverConnection.setModificationInfo(true, regionName, key);
} catch (EntryNotFoundException e) {
// Don't send an exception back to the client if this
// exception happens. Just log it and continue.
logger.info(LocalizedMessage.create(
LocalizedStrings.Destroy_0_DURING_ENTRY_DESTROY_NO_ENTRY_WAS_FOUND_FOR_KEY_1,
- new Object[] {servConn.getName(), key}));
+ new Object[] { serverConnection.getName(), key}));
entryNotFoundForRemove = true;
} catch (RegionDestroyedException rde) {
- writeException(msg, rde, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, rde, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
return;
} catch (Exception e) {
// If an interrupted exception is thrown , rethrow it
- checkForInterrupt(servConn, e);
+ checkForInterrupt(serverConnection, e);
// If an exception occurs during the destroy, preserve the connection
- writeException(msg, e, false, servConn);
- servConn.setAsTrue(RESPONDED);
+ writeException(clientMessage, e, false, serverConnection);
+ serverConnection.setAsTrue(RESPONDED);
if (e instanceof GemFireSecurityException) {
// Fine logging for security exceptions since these are already
// logged by the security logger
if (logger.isDebugEnabled()) {
- logger.debug("{}: Unexpected Security exception", servConn.getName(), e);
+ logger.debug("{}: Unexpected Security exception", serverConnection.getName(), e);
}
} else {
logger.warn(LocalizedMessage.create(LocalizedStrings.Destroy_0_UNEXPECTED_EXCEPTION,
- servConn.getName()), e);
+ serverConnection.getName()), e);
}
return;
}
@@ -314,20 +314,20 @@ public class Destroy65 extends BaseCommand {
if (region instanceof PartitionedRegion) {
PartitionedRegion pr = (PartitionedRegion) region;
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
- writeReplyWithRefreshMetadata(msg, servConn, pr, entryNotFoundForRemove,
+ writeReplyWithRefreshMetadata(clientMessage, serverConnection, pr, entryNotFoundForRemove,
pr.getNetworkHopType(), clientEvent.getVersionTag());
pr.clearNetworkHopData();
} else {
- writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(),
+ writeReply(clientMessage, serverConnection, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(),
clientEvent.getVersionTag());
}
} else {
- writeReply(msg, servConn, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(),
+ writeReply(clientMessage, serverConnection, entryNotFoundForRemove | clientEvent.getIsRedestroyedEntry(),
clientEvent.getVersionTag());
}
- servConn.setAsTrue(RESPONDED);
+ serverConnection.setAsTrue(RESPONDED);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Sent destroy response for region {} key {}", servConn.getName(), regionName,
+ logger.debug("{}: Sent destroy response for region {} key {}", serverConnection.getName(), regionName,
key);
}
stats.incWriteDestroyResponseTime(DistributionStats.getStatTime() - start);
http://git-wip-us.apache.org/repos/asf/geode/blob/0a5b8d38/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java
index 59a7233..7c07c72 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy70.java
@@ -67,7 +67,7 @@ public class Destroy70 extends Destroy65 {
pr.getPrStats().incPRMetaDataSentCount();
replyMsg.send(servConn);
if (logger.isTraceEnabled()) {
- logger.trace("{}: rpl with REFRESH_METADAT tx: {}", servConn.getName(),
+ logger.trace("{}: rpl with REFRESH_METADATA tx: {}", servConn.getName(),
origMsg.getTransactionId());
}
}
@@ -104,7 +104,7 @@ public class Destroy70 extends Destroy65 {
// logger.fine("response has no version tag");
// }
}
- replyMsg.addBytesPart(OK_BYTES); // make old single-hop code happy by puting byte[]{0} here
+ replyMsg.addBytesPart(okBytes()); // make old single-hop code happy by puting byte[]{0} here
replyMsg.addIntPart(entryNotFound ? 1 : 0);
replyMsg.send(servConn);
if (logger.isTraceEnabled()) {