You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2017/09/14 21:47:34 UTC
[geode] branch develop updated: GEODE-3083: New protocol should
record statistics
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 3d8f016 GEODE-3083: New protocol should record statistics
3d8f016 is described below
commit 3d8f0160c3725108bc96286806c07c5dcecc43a8
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue Sep 12 10:01:56 2017 -0700
GEODE-3083: New protocol should record statistics
Statistics implemented: connections, connection starts, connection termination,
bytes/second received bytes/second sent, authorization violations
---
.../distributed/internal/InternalLocator.java | 7 +
.../distributed/internal/tcpserver/TcpServer.java | 8 +
.../apache/geode/internal/cache/tier/Acceptor.java | 6 +
.../internal/cache/tier/sockets/AcceptorImpl.java | 15 +-
.../cache/tier/sockets/CacheServerStats.java | 282 +++++++++++----------
.../tier/sockets/ClientProtocolMessageHandler.java | 7 +
...eHandler.java => ClientProtocolStatistics.java} | 20 +-
.../sockets/GenericProtocolServerConnection.java | 30 ++-
.../tier/sockets/MessageExecutionContext.java | 19 +-
...ocolMessageHandler.java => NoOpStatistics.java} | 23 +-
.../tier/sockets/ServerConnectionFactory.java | 26 +-
.../tier/sockets/ServerConnectionFactoryTest.java | 2 +-
.../cache/tier/sockets/ServerConnectionTest.java | 6 +-
.../protocol/protobuf/ProtobufOpsProcessor.java | 7 +
.../protocol/protobuf/ProtobufStreamProcessor.java | 17 ++
.../statistics/ProtobufClientStatistics.java | 27 +-
.../statistics/ProtobufClientStatisticsImpl.java | 104 ++++++++
.../GenericProtocolServerConnectionTest.java | 7 +-
.../RoundTripCacheConnectionJUnitTest.java | 61 +++++
.../RoundTripLocatorConnectionDUnitTest.java | 18 ++
.../protobuf/ProtobufStreamProcessorTest.java | 5 +-
.../GetAllRequestOperationHandlerJUnitTest.java | 22 +-
...egionNamesRequestOperationHandlerJUnitTest.java | 10 +-
.../GetRegionRequestOperationHandlerJUnitTest.java | 30 ++-
.../GetRequestOperationHandlerJUnitTest.java | 37 +--
.../PutAllRequestOperationHandlerJUnitTest.java | 48 ++--
.../PutRequestOperationHandlerJUnitTest.java | 44 ++--
.../RemoveRequestOperationHandlerJUnitTest.java | 40 +--
.../statistics/NoOpProtobufStatistics.java | 45 ++--
29 files changed, 643 insertions(+), 330 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 06603cc..c4541c3 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -62,6 +62,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
import org.apache.geode.internal.cache.wan.WANServiceProvider;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -1334,6 +1335,12 @@ public class InternalLocator extends Locator implements ConnectListener {
try {
this.stats.hookupStats(sys,
SocketCreator.getLocalHost().getCanonicalHostName() + '-' + this.server.getBindAddress());
+ ClientProtocolMessageHandler messageHandler = this.server.getMessageHandler();
+ if (messageHandler != null) {
+ // GEODE-3546 - this should create locator-specific stats but is creating client/server
+ // stats
+ messageHandler.initializeStatistics("LocatorStats", sys);
+ }
} catch (UnknownHostException e) {
logger.warn(e);
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index a721589..cf8e477 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -155,6 +155,13 @@ public class TcpServer {
GOSSIP_TO_GEMFIRE_VERSION_MAP.put(OLDGOSSIPVERSION, Version.GFE_57.ordinal());
}
+ /**
+ * returns the message handler used for client/locator communications processing
+ */
+ public ClientProtocolMessageHandler getMessageHandler() {
+ return messageHandler;
+ }
+
public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
ThreadGroup threadGroup, String threadName, InternalLocator internalLocator,
@@ -575,4 +582,5 @@ public class TcpServer {
public static Map getGossipVersionMapForTestOnly() {
return GOSSIP_TO_GEMFIRE_VERSION_MAP;
}
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
index f062129..9596f74 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java
@@ -50,6 +50,12 @@ public abstract class Acceptor {
public abstract int getPort();
/**
+ * returns the server's name string, including the inet address and port that the server is
+ * listening on
+ */
+ public abstract String getServerName();
+
+ /**
* Closes this acceptor thread
*/
public abstract void close();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index f9bc596..68377d8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -511,7 +511,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
}
}
this.localPort = port;
- String sockName = this.serverSock.getLocalSocketAddress().toString();
+ String sockName = getServerName();
logger.info(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_CACHE_SERVER_CONNECTION_LISTENER_BOUND_TO_ADDRESS_0_WITH_BACKLOG_1,
new Object[] {sockName, Integer.valueOf(backLog)}));
@@ -1178,6 +1178,17 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
return localPort;
}
+ @Override
+ public String getServerName() {
+ String name = this.serverSock.getLocalSocketAddress().toString();
+ try {
+ name = SocketCreator.getLocalHost().getCanonicalHostName() + "-" + name;
+ } catch (Exception e) {
+ }
+ return name;
+ }
+
+
public InetAddress getServerInetAddr() {
return this.serverSock.getInetAddress();
}
@@ -1487,7 +1498,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
ServerConnection serverConn = serverConnectionFactory.makeServerConnection(socket, this.cache,
this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize,
- communicationModeStr, communicationMode, this, this.securityService);
+ communicationModeStr, communicationMode, this, this.securityService, this.getBindAddress());
synchronized (this.allSCsLock) {
this.allSCs.add(serverConn);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
index 4f2a0b3..1849487 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheServerStats.java
@@ -233,261 +233,263 @@ public class CacheServerStats implements MessageStats {
* Add a convinience method to pass in a StatisticsFactory for Statistics construction. Helpful
* for local Statistics operations
*
- * @param f
+ * @param statisticsFactory
* @param ownerName
*/
- public CacheServerStats(StatisticsFactory f, String ownerName, String typeName,
- StatisticDescriptor[] descriptiors) {
- if (f == null) {
+ public CacheServerStats(StatisticsFactory statisticsFactory, String ownerName, String typeName,
+ StatisticDescriptor[] descriptors) {
+ if (statisticsFactory == null) {
// Create statistics later when needed
return;
}
StatisticDescriptor[] serverStatDescriptors = new StatisticDescriptor[] {
- f.createIntCounter("getRequests", "Number of cache client get requests.", "operations"),
- f.createLongCounter("readGetRequestTime", "Total time spent in reading get requests.",
- "nanoseconds"),
- f.createLongCounter("processGetTime",
- "Total time spent in processing a cache client get request, including the time to get an object from the cache.",
- "nanoseconds"),
- f.createIntCounter("getResponses", "Number of get responses written to the cache client.",
- "operations"),
- f.createLongCounter("writeGetResponseTime", "Total time spent in writing get responses.",
- "nanoseconds"),
-
- f.createIntCounter("putRequests", "Number of cache client put requests.", "operations"),
- f.createLongCounter("readPutRequestTime", "Total time spent in reading put requests.",
- "nanoseconds"),
- f.createLongCounter("processPutTime",
- "Total time spent in processing a cache client put request, including the time to put an object into the cache.",
- "nanoseconds"),
- f.createIntCounter("putResponses", "Number of put responses written to the cache client.",
+ statisticsFactory.createIntCounter("getRequests", "Number of cache client get requests.",
"operations"),
- f.createLongCounter("writePutResponseTime", "Total time spent in writing put responses.",
+ statisticsFactory.createLongCounter("readGetRequestTime",
+ "Total time spent in reading get requests.", "nanoseconds"),
+ statisticsFactory.createLongCounter("processGetTime",
+ "Total time spent in processing a cache client get request, including the time to get an object from the cache.",
"nanoseconds"),
+ statisticsFactory.createIntCounter("getResponses",
+ "Number of get responses written to the cache client.", "operations"),
+ statisticsFactory.createLongCounter("writeGetResponseTime",
+ "Total time spent in writing get responses.", "nanoseconds"),
- f.createIntCounter("putAllRequests", "Number of cache client putAll requests.",
+ statisticsFactory.createIntCounter("putRequests", "Number of cache client put requests.",
"operations"),
- f.createLongCounter("readPutAllRequestTime", "Total time spent in reading putAll requests.",
+ statisticsFactory.createLongCounter("readPutRequestTime",
+ "Total time spent in reading put requests.", "nanoseconds"),
+ statisticsFactory.createLongCounter("processPutTime",
+ "Total time spent in processing a cache client put request, including the time to put an object into the cache.",
"nanoseconds"),
- f.createLongCounter("processPutAllTime",
+ statisticsFactory.createIntCounter("putResponses",
+ "Number of put responses written to the cache client.", "operations"),
+ statisticsFactory.createLongCounter("writePutResponseTime",
+ "Total time spent in writing put responses.", "nanoseconds"),
+
+ statisticsFactory.createIntCounter("putAllRequests",
+ "Number of cache client putAll requests.", "operations"),
+ statisticsFactory.createLongCounter("readPutAllRequestTime",
+ "Total time spent in reading putAll requests.", "nanoseconds"),
+ statisticsFactory.createLongCounter("processPutAllTime",
"Total time spent in processing a cache client putAll request, including the time to put all objects into the cache.",
"nanoseconds"),
- f.createIntCounter("putAllResponses",
+ statisticsFactory.createIntCounter("putAllResponses",
"Number of putAll responses written to the cache client.", "operations"),
- f.createLongCounter("writePutAllResponseTime",
+ statisticsFactory.createLongCounter("writePutAllResponseTime",
"Total time spent in writing putAll responses.", "nanoseconds"),
- f.createIntCounter("removeAllRequests", "Number of cache client removeAll requests.",
- "operations"),
- f.createLongCounter("readRemoveAllRequestTime",
+ statisticsFactory.createIntCounter("removeAllRequests",
+ "Number of cache client removeAll requests.", "operations"),
+ statisticsFactory.createLongCounter("readRemoveAllRequestTime",
"Total time spent in reading removeAll requests.", "nanoseconds"),
- f.createLongCounter("processRemoveAllTime",
+ statisticsFactory.createLongCounter("processRemoveAllTime",
"Total time spent in processing a cache client removeAll request, including the time to remove all objects from the cache.",
"nanoseconds"),
- f.createIntCounter("removeAllResponses",
+ statisticsFactory.createIntCounter("removeAllResponses",
"Number of removeAll responses written to the cache client.", "operations"),
- f.createLongCounter("writeRemoveAllResponseTime",
+ statisticsFactory.createLongCounter("writeRemoveAllResponseTime",
"Total time spent in writing removeAll responses.", "nanoseconds"),
- f.createIntCounter("getAllRequests", "Number of cache client getAll requests.",
- "operations"),
- f.createLongCounter("readGetAllRequestTime", "Total time spent in reading getAll requests.",
- "nanoseconds"),
- f.createLongCounter("processGetAllTime",
+ statisticsFactory.createIntCounter("getAllRequests",
+ "Number of cache client getAll requests.", "operations"),
+ statisticsFactory.createLongCounter("readGetAllRequestTime",
+ "Total time spent in reading getAll requests.", "nanoseconds"),
+ statisticsFactory.createLongCounter("processGetAllTime",
"Total time spent in processing a cache client getAll request.", "nanoseconds"),
- f.createIntCounter("getAllResponses",
+ statisticsFactory.createIntCounter("getAllResponses",
"Number of getAll responses written to the cache client.", "operations"),
- f.createLongCounter("writeGetAllResponseTime",
+ statisticsFactory.createLongCounter("writeGetAllResponseTime",
"Total time spent in writing getAll responses.", "nanoseconds"),
- f.createIntCounter("destroyRequests", "Number of cache client destroy requests.",
- "operations"),
- f.createLongCounter("readDestroyRequestTime",
+ statisticsFactory.createIntCounter("destroyRequests",
+ "Number of cache client destroy requests.", "operations"),
+ statisticsFactory.createLongCounter("readDestroyRequestTime",
"Total time spent in reading destroy requests.", "nanoseconds"),
- f.createLongCounter("processDestroyTime",
+ statisticsFactory.createLongCounter("processDestroyTime",
"Total time spent in processing a cache client destroy request, including the time to destroy an object from the cache.",
"nanoseconds"),
- f.createIntCounter("destroyResponses",
+ statisticsFactory.createIntCounter("destroyResponses",
"Number of destroy responses written to the cache client.", "operations"),
- f.createLongCounter("writeDestroyResponseTime",
+ statisticsFactory.createLongCounter("writeDestroyResponseTime",
"Total time spent in writing destroy responses.", "nanoseconds"),
- f.createIntCounter("invalidateRequests", "Number of cache client invalidate requests.",
- "operations"),
- f.createLongCounter("readInvalidateRequestTime",
+ statisticsFactory.createIntCounter("invalidateRequests",
+ "Number of cache client invalidate requests.", "operations"),
+ statisticsFactory.createLongCounter("readInvalidateRequestTime",
"Total time spent in reading invalidate requests.", "nanoseconds"),
- f.createLongCounter("processInvalidateTime",
+ statisticsFactory.createLongCounter("processInvalidateTime",
"Total time spent in processing a cache client invalidate request, including the time to invalidate an object from the cache.",
"nanoseconds"),
- f.createIntCounter("invalidateResponses",
+ statisticsFactory.createIntCounter("invalidateResponses",
"Number of invalidate responses written to the cache client.", "operations"),
- f.createLongCounter("writeInvalidateResponseTime",
+ statisticsFactory.createLongCounter("writeInvalidateResponseTime",
"Total time spent in writing invalidate responses.", "nanoseconds"),
- f.createIntCounter("sizeRequests", "Number of cache client size requests.", "operations"),
- f.createLongCounter("readSizeRequestTime", "Total time spent in reading size requests.",
- "nanoseconds"),
- f.createLongCounter("processSizeTime",
- "Total time spent in processing a cache client size request, including the time to size an object from the cache.",
- "nanoseconds"),
- f.createIntCounter("sizeResponses", "Number of size responses written to the cache client.",
+ statisticsFactory.createIntCounter("sizeRequests", "Number of cache client size requests.",
"operations"),
- f.createLongCounter("writeSizeResponseTime", "Total time spent in writing size responses.",
+ statisticsFactory.createLongCounter("readSizeRequestTime",
+ "Total time spent in reading size requests.", "nanoseconds"),
+ statisticsFactory.createLongCounter("processSizeTime",
+ "Total time spent in processing a cache client size request, including the time to size an object from the cache.",
"nanoseconds"),
+ statisticsFactory.createIntCounter("sizeResponses",
+ "Number of size responses written to the cache client.", "operations"),
+ statisticsFactory.createLongCounter("writeSizeResponseTime",
+ "Total time spent in writing size responses.", "nanoseconds"),
- f.createIntCounter("queryRequests", "Number of cache client query requests.", "operations"),
- f.createLongCounter("readQueryRequestTime", "Total time spent in reading query requests.",
- "nanoseconds"),
- f.createLongCounter("processQueryTime",
+ statisticsFactory.createIntCounter("queryRequests",
+ "Number of cache client query requests.", "operations"),
+ statisticsFactory.createLongCounter("readQueryRequestTime",
+ "Total time spent in reading query requests.", "nanoseconds"),
+ statisticsFactory.createLongCounter("processQueryTime",
"Total time spent in processing a cache client query request, including the time to destroy an object from the cache.",
"nanoseconds"),
- f.createIntCounter("queryResponses",
+ statisticsFactory.createIntCounter("queryResponses",
"Number of query responses written to the cache client.", "operations"),
- f.createLongCounter("writeQueryResponseTime",
+ statisticsFactory.createLongCounter("writeQueryResponseTime",
"Total time spent in writing query responses.", "nanoseconds"),
- f.createIntCounter("destroyRegionRequests",
+ statisticsFactory.createIntCounter("destroyRegionRequests",
"Number of cache client destroyRegion requests.", "operations"),
- f.createLongCounter("readDestroyRegionRequestTime",
+ statisticsFactory.createLongCounter("readDestroyRegionRequestTime",
"Total time spent in reading destroyRegion requests.", "nanoseconds"),
- f.createLongCounter("processDestroyRegionTime",
+ statisticsFactory.createLongCounter("processDestroyRegionTime",
"Total time spent in processing a cache client destroyRegion request, including the time to destroy the region from the cache.",
"nanoseconds"),
- f.createIntCounter("destroyRegionResponses",
+ statisticsFactory.createIntCounter("destroyRegionResponses",
"Number of destroyRegion responses written to the cache client.", "operations"),
- f.createLongCounter("writeDestroyRegionResponseTime",
+ statisticsFactory.createLongCounter("writeDestroyRegionResponseTime",
"Total time spent in writing destroyRegion responses.", "nanoseconds"),
- f.createIntCounter("containsKeyRequests", "Number of cache client containsKey requests.",
- "operations"),
- f.createLongCounter("readContainsKeyRequestTime",
+ statisticsFactory.createIntCounter("containsKeyRequests",
+ "Number of cache client containsKey requests.", "operations"),
+ statisticsFactory.createLongCounter("readContainsKeyRequestTime",
"Total time spent reading containsKey requests.", "nanoseconds"),
- f.createLongCounter("processContainsKeyTime",
+ statisticsFactory.createLongCounter("processContainsKeyTime",
"Total time spent processing a containsKey request.", "nanoseconds"),
- f.createIntCounter("containsKeyResponses",
+ statisticsFactory.createIntCounter("containsKeyResponses",
"Number of containsKey responses written to the cache client.", "operations"),
- f.createLongCounter("writeContainsKeyResponseTime",
+ statisticsFactory.createLongCounter("writeContainsKeyResponseTime",
"Total time spent writing containsKey responses.", "nanoseconds"),
- f.createIntCounter("processBatchRequests", "Number of cache client processBatch requests.",
- "operations"),
- f.createLongCounter("readProcessBatchRequestTime",
+ statisticsFactory.createIntCounter("processBatchRequests",
+ "Number of cache client processBatch requests.", "operations"),
+ statisticsFactory.createLongCounter("readProcessBatchRequestTime",
"Total time spent in reading processBatch requests.", "nanoseconds"),
- f.createLongCounter("processBatchTime",
+ statisticsFactory.createLongCounter("processBatchTime",
"Total time spent in processing a cache client processBatch request.", "nanoseconds"),
- f.createIntCounter("processBatchResponses",
+ statisticsFactory.createIntCounter("processBatchResponses",
"Number of processBatch responses written to the cache client.", "operations"),
- f.createLongCounter("writeProcessBatchResponseTime",
+ statisticsFactory.createLongCounter("writeProcessBatchResponseTime",
"Total time spent in writing processBatch responses.", "nanoseconds"),
- f.createLongCounter("batchSize", "The size of the batches received.", "bytes"),
- f.createIntCounter("clearRegionRequests", "Number of cache client clearRegion requests.",
- "operations"),
- f.createLongCounter("readClearRegionRequestTime",
+ statisticsFactory.createLongCounter("batchSize", "The size of the batches received.",
+ "bytes"),
+ statisticsFactory.createIntCounter("clearRegionRequests",
+ "Number of cache client clearRegion requests.", "operations"),
+ statisticsFactory.createLongCounter("readClearRegionRequestTime",
"Total time spent in reading clearRegion requests.", "nanoseconds"),
- f.createLongCounter("processClearRegionTime",
+ statisticsFactory.createLongCounter("processClearRegionTime",
"Total time spent in processing a cache client clearRegion request, including the time to clear the region from the cache.",
"nanoseconds"),
- f.createIntCounter("clearRegionResponses",
+ statisticsFactory.createIntCounter("clearRegionResponses",
"Number of clearRegion responses written to the cache client.", "operations"),
- f.createLongCounter("writeClearRegionResponseTime",
+ statisticsFactory.createLongCounter("writeClearRegionResponseTime",
"Total time spent in writing clearRegion responses.", "nanoseconds"),
- f.createIntCounter("clientNotificationRequests",
+ statisticsFactory.createIntCounter("clientNotificationRequests",
"Number of cache client notification requests.", "operations"),
- f.createLongCounter("readClientNotificationRequestTime",
+ statisticsFactory.createLongCounter("readClientNotificationRequestTime",
"Total time spent in reading client notification requests.", "nanoseconds"),
- f.createLongCounter("processClientNotificationTime",
+ statisticsFactory.createLongCounter("processClientNotificationTime",
"Total time spent in processing a cache client notification request.", "nanoseconds"),
- f.createIntCounter("updateClientNotificationRequests",
+ statisticsFactory.createIntCounter("updateClientNotificationRequests",
"Number of cache client notification update requests.", "operations"),
- f.createLongCounter("readUpdateClientNotificationRequestTime",
+ statisticsFactory.createLongCounter("readUpdateClientNotificationRequestTime",
"Total time spent in reading client notification update requests.", "nanoseconds"),
- f.createLongCounter("processUpdateClientNotificationTime",
+ statisticsFactory.createLongCounter("processUpdateClientNotificationTime",
"Total time spent in processing a client notification update request.", "nanoseconds"),
- f.createIntCounter("clientReadyRequests", "Number of cache client ready requests.",
- "operations"),
- f.createLongCounter("readClientReadyRequestTime",
+ statisticsFactory.createIntCounter("clientReadyRequests",
+ "Number of cache client ready requests.", "operations"),
+ statisticsFactory.createLongCounter("readClientReadyRequestTime",
"Total time spent in reading cache client ready requests.", "nanoseconds"),
- f.createLongCounter("processClientReadyTime",
+ statisticsFactory.createLongCounter("processClientReadyTime",
"Total time spent in processing a cache client ready request, including the time to destroy an object from the cache.",
"nanoseconds"),
- f.createIntCounter("clientReadyResponses",
+ statisticsFactory.createIntCounter("clientReadyResponses",
"Number of client ready responses written to the cache client.", "operations"),
- f.createLongCounter("writeClientReadyResponseTime",
+ statisticsFactory.createLongCounter("writeClientReadyResponseTime",
"Total time spent in writing client ready responses.", "nanoseconds"),
- f.createIntCounter("closeConnectionRequests",
+ statisticsFactory.createIntCounter("closeConnectionRequests",
"Number of cache client close connection requests.", "operations"),
- f.createLongCounter("readCloseConnectionRequestTime",
+ statisticsFactory.createLongCounter("readCloseConnectionRequestTime",
"Total time spent in reading close connection requests.", "nanoseconds"),
- f.createLongCounter("processCloseConnectionTime",
+ statisticsFactory.createLongCounter("processCloseConnectionTime",
"Total time spent in processing a cache client close connection request.",
"nanoseconds"),
- f.createIntCounter("failedConnectionAttempts", "Number of failed connection attempts.",
- "attempts"),
- f.createIntGauge("currentClientConnections",
+ statisticsFactory.createIntCounter("failedConnectionAttempts",
+ "Number of failed connection attempts.", "attempts"),
+ statisticsFactory.createIntGauge("currentClientConnections",
"Number of sockets accepted and used for client to server messaging.", "sockets"),
- f.createIntGauge("currentQueueConnections",
+ statisticsFactory.createIntGauge("currentQueueConnections",
"Number of sockets accepted and used for server to client messaging.", "sockets"),
- f.createIntGauge("currentClients", "Number of client virtual machines connected.",
- "clients"),
- f.createIntCounter("outOfOrderGatewayBatchIds", "Number of Out of order batch IDs.",
- "batches"),
- f.createIntCounter("abandonedWriteRequests",
+ statisticsFactory.createIntGauge("currentClients",
+ "Number of client virtual machines connected.", "clients"),
+ statisticsFactory.createIntCounter("outOfOrderGatewayBatchIds",
+ "Number of Out of order batch IDs.", "batches"),
+ statisticsFactory.createIntCounter("abandonedWriteRequests",
"Number of write opertations abandond by clients", "requests"),
- f.createIntCounter("abandonedReadRequests",
+ statisticsFactory.createIntCounter("abandonedReadRequests",
"Number of read opertations abandond by clients", "requests"),
- f.createLongCounter("receivedBytes", "Total number of bytes received from clients.",
+ statisticsFactory.createLongCounter("receivedBytes",
+ "Total number of bytes received from clients.", "bytes"),
+ statisticsFactory.createLongCounter("sentBytes", "Total number of bytes sent to clients.",
"bytes"),
- f.createLongCounter("sentBytes", "Total number of bytes sent to clients.", "bytes"),
- f.createIntGauge("messagesBeingReceived",
+ statisticsFactory.createIntGauge("messagesBeingReceived",
"Current number of message being received off the network or being processed after reception.",
"messages"),
- f.createLongGauge("messageBytesBeingReceived",
+ statisticsFactory.createLongGauge("messageBytesBeingReceived",
"Current number of bytes consumed by messages being received or processed.", "bytes"),
- f.createIntCounter("connectionsTimedOut",
+ statisticsFactory.createIntCounter("connectionsTimedOut",
"Total number of connections that have been timed out by the server because of client inactivity",
"connections"),
- f.createIntGauge("threadQueueSize",
+ statisticsFactory.createIntGauge("threadQueueSize",
"Current number of connections waiting for a thread to start processing their message.",
"connections"),
- f.createIntGauge("acceptsInProgress",
+ statisticsFactory.createIntGauge("acceptsInProgress",
"Current number of server accepts that are attempting to do the initial handshake with the client.",
"accepts"),
- f.createIntCounter("acceptThreadStarts",
+ statisticsFactory.createIntCounter("acceptThreadStarts",
"Total number of threads created to deal with an accepted socket. Note that this is not the current number of threads.",
"starts"),
- f.createIntCounter("connectionThreadStarts",
+ statisticsFactory.createIntCounter("connectionThreadStarts",
"Total number of threads created to deal with a client connection. Note that this is not the current number of threads.",
"starts"),
- f.createIntGauge("connectionThreads",
+ statisticsFactory.createIntGauge("connectionThreads",
"Current number of threads dealing with a client connection.", "threads"),
- f.createDoubleGauge("connectionLoad",
+ statisticsFactory.createDoubleGauge("connectionLoad",
"The load from client to server connections as reported by the load probe installed in this server",
"load"),
- f.createDoubleGauge("loadPerConnection",
+ statisticsFactory.createDoubleGauge("loadPerConnection",
"The estimate of how much load is added for each new connection as reported by the load probe installed in this server",
"load"),
- f.createDoubleGauge("queueLoad",
+ statisticsFactory.createDoubleGauge("queueLoad",
"The load from queues as reported by the load probe installed in this server", "load"),
- f.createDoubleGauge("loadPerQueue",
+ statisticsFactory.createDoubleGauge("loadPerQueue",
"The estimate of how much load is added for each new connection as reported by the load probe installed in this server",
"load")};
StatisticDescriptor[] alldescriptors = serverStatDescriptors;
- if (descriptiors != null) {
- alldescriptors = new StatisticDescriptor[descriptiors.length + serverStatDescriptors.length];
- System.arraycopy(descriptiors, 0, alldescriptors, 0, descriptiors.length);
- System.arraycopy(serverStatDescriptors, 0, alldescriptors, descriptiors.length,
+ if (descriptors != null) {
+ alldescriptors = new StatisticDescriptor[descriptors.length + serverStatDescriptors.length];
+ System.arraycopy(descriptors, 0, alldescriptors, 0, descriptors.length);
+ System.arraycopy(serverStatDescriptors, 0, alldescriptors, descriptors.length,
serverStatDescriptors.length);
}
- statType = f.createType(typeName, typeName, alldescriptors);
- try {
- ownerName = SocketCreator.getLocalHost().getCanonicalHostName() + "-" + ownerName;
- } catch (Exception e) {
- }
- this.stats = f.createAtomicStatistics(statType, ownerName);
+ statType = statisticsFactory.createType(typeName, typeName, alldescriptors);
+ this.stats = statisticsFactory.createAtomicStatistics(statType, ownerName);
getRequestsId = this.stats.nameToId("getRequests");
readGetRequestTimeId = this.stats.nameToId("readGetRequestTime");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
index 0ced3aa..b8969e1 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
@@ -19,6 +19,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsFactory;
+
/**
* This is an interface that other modules can implement to hook into
@@ -29,6 +32,10 @@ import java.io.OutputStream;
* {@link GenericProtocolServerConnection}.
*/
public interface ClientProtocolMessageHandler {
+ void initializeStatistics(String statisticsName, StatisticsFactory factory);
+
+ ClientProtocolStatistics getStatistics();
+
void receiveMessage(InputStream inputStream, OutputStream outputStream,
MessageExecutionContext executionContext) throws IOException;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolStatistics.java
similarity index 56%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolStatistics.java
index 0ced3aa..3c3acbe 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolStatistics.java
@@ -12,23 +12,13 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-
/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
+ * Implementations of this interface record statistics for the corresponding client/server protocol
*/
-public interface ClientProtocolMessageHandler {
- void receiveMessage(InputStream inputStream, OutputStream outputStream,
- MessageExecutionContext executionContext) throws IOException;
+public interface ClientProtocolStatistics {
+ public void clientConnected();
+
+ public void clientDisconnected();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index 6c81028..767b6c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -36,20 +36,23 @@ public class GenericProtocolServerConnection extends ServerConnection {
private final ClientProtocolMessageHandler messageHandler;
private final SecurityManager securityManager;
private final Authenticator authenticator;
+ private boolean cleanedUp;
/**
* Creates a new <code>GenericProtocolServerConnection</code> that processes messages received
* from an edge client over a given <code>Socket</code>.
*/
- public GenericProtocolServerConnection(Socket s, InternalCache c, CachedRegionHelper helper,
- CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
- byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol,
- SecurityService securityService, Authenticator authenticator) {
- super(s, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode,
- acceptor, securityService);
+ public GenericProtocolServerConnection(Socket socket, InternalCache cache,
+ CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+ String communicationModeStr, byte communicationMode, Acceptor acceptor,
+ ClientProtocolMessageHandler newClientProtocol, SecurityService securityService,
+ Authenticator authenticator) {
+ super(socket, cache, helper, stats, hsTimeout, socketBufferSize, communicationModeStr,
+ communicationMode, acceptor, securityService);
securityManager = securityService.getSecurityManager();
this.messageHandler = newClientProtocol;
this.authenticator = authenticator;
+ this.messageHandler.getStatistics().clientConnected();
}
@Override
@@ -62,8 +65,8 @@ public class GenericProtocolServerConnection extends ServerConnection {
if (!authenticator.isAuthenticated()) {
authenticator.authenticate(inputStream, outputStream, securityManager);
} else {
- messageHandler.receiveMessage(inputStream, outputStream,
- new MessageExecutionContext(this.getCache(), authenticator.getAuthorizer()));
+ messageHandler.receiveMessage(inputStream, outputStream, new MessageExecutionContext(
+ this.getCache(), authenticator.getAuthorizer(), messageHandler.getStatistics()));
}
} catch (EOFException e) {
this.setFlagProcessMessagesAsFalse();
@@ -76,6 +79,17 @@ public class GenericProtocolServerConnection extends ServerConnection {
}
@Override
+ public boolean cleanup() {
+ synchronized (this) {
+ if (!cleanedUp) {
+ cleanedUp = true;
+ messageHandler.getStatistics().clientDisconnected();
+ }
+ }
+ return super.cleanup();
+ }
+
+ @Override
protected boolean doHandShake(byte epType, int qSize) {
// no handshake for new client protocol.
return true;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
index 1130ce7..d1fc461 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
@@ -28,10 +28,14 @@ public class MessageExecutionContext {
private Cache cache;
private Locator locator;
private Authorizer authorizer;
+ private ClientProtocolStatistics statistics;
- public MessageExecutionContext(Cache cache, Authorizer streamAuthorizer) {
+
+ public MessageExecutionContext(Cache cache, Authorizer streamAuthorizer,
+ ClientProtocolStatistics statistics) {
this.cache = cache;
this.authorizer = streamAuthorizer;
+ this.statistics = statistics;
}
public MessageExecutionContext(InternalLocator locator) {
@@ -39,6 +43,7 @@ public class MessageExecutionContext {
// set a no-op authorizer until such time as locators implement authentication
// and authorization checks
this.authorizer = new NoOpAuthorizer();
+ this.statistics = new NoOpStatistics();
}
/**
@@ -76,4 +81,16 @@ public class MessageExecutionContext {
public Authorizer getAuthorizer() {
return authorizer;
}
+
+ /**
+ * Returns the statistics for recording operation stats. In a unit test environment this may not
+ * be a protocol-specific statistics implementation.
+ *
+ * @return
+ */
+ public ClientProtocolStatistics getStatistics() {
+ return statistics;
+ }
+
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
similarity index 56%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
index 0ced3aa..d04db47 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
@@ -12,23 +12,16 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+public class NoOpStatistics implements ClientProtocolStatistics {
+ @Override
+ public void clientConnected() {
+ }
-/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
- */
-public interface ClientProtocolMessageHandler {
- void receiveMessage(InputStream inputStream, OutputStream outputStream,
- MessageExecutionContext executionContext) throws IOException;
+ @Override
+ public void clientDisconnected() {
+
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index f0348c3..31eab49 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -17,6 +17,7 @@ package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
+import org.apache.geode.StatisticsFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
@@ -24,6 +25,7 @@ import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.security.server.Authenticator;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
@@ -49,12 +51,14 @@ public class ServerConnectionFactory {
}
}
- private synchronized ClientProtocolMessageHandler initializeMessageHandler() {
+ private synchronized ClientProtocolMessageHandler initializeMessageHandler(
+ StatisticsFactory statisticsFactory, String statisticsName) {
if (protocolHandler != null) {
return protocolHandler;
}
protocolHandler = new MessageHandlerFactory().makeMessageHandler();
+ protocolHandler.initializeStatistics(statisticsName, statisticsFactory);
return protocolHandler;
}
@@ -78,17 +82,18 @@ public class ServerConnectionFactory {
}
}
- private ClientProtocolMessageHandler getClientProtocolMessageHandler() {
+ private ClientProtocolMessageHandler getOrCreateClientProtocolMessageHandler(
+ StatisticsFactory statisticsFactory, Acceptor acceptor) {
if (protocolHandler == null) {
- initializeMessageHandler();
+ return initializeMessageHandler(statisticsFactory, acceptor.getServerName());
}
return protocolHandler;
}
- public ServerConnection makeServerConnection(Socket s, InternalCache c, CachedRegionHelper helper,
- CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
- byte communicationMode, Acceptor acceptor, SecurityService securityService)
- throws IOException {
+ public ServerConnection makeServerConnection(Socket socket, InternalCache cache,
+ CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+ String communicationModeStr, byte communicationMode, Acceptor acceptor,
+ SecurityService securityService, InetAddress bindAddress) throws IOException {
if (communicationMode == ProtobufClientServerProtocol.getModeNumber()) {
if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
throw new IOException("Server received unknown communication mode: " + communicationMode);
@@ -96,12 +101,13 @@ public class ServerConnectionFactory {
String authenticationMode =
System.getProperty("geode.protocol-authentication-mode", "NOOP");
- return new GenericProtocolServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize,
- communicationModeStr, communicationMode, acceptor, getClientProtocolMessageHandler(),
+ return new GenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
+ socketBufferSize, communicationModeStr, communicationMode, acceptor,
+ getOrCreateClientProtocolMessageHandler(cache.getDistributedSystem(), acceptor),
securityService, findStreamAuthenticator(authenticationMode));
}
} else {
- return new LegacyServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize,
+ return new LegacyServerConnection(socket, cache, helper, stats, hsTimeout, socketBufferSize,
communicationModeStr, communicationMode, acceptor, securityService);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 09c5949..56d3770 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -107,7 +107,7 @@ public class ServerConnectionFactoryTest {
return new ServerConnectionFactory().makeServerConnection(socketMock, mock(InternalCache.class),
mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "", communicationMode,
- mock(AcceptorImpl.class), mock(SecurityService.class));
+ mock(AcceptorImpl.class), mock(SecurityService.class), InetAddress.getLocalHost());
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index bd23223..a4ebbac 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -82,9 +82,9 @@ public class ServerConnectionTest {
InternalCache cache = mock(InternalCache.class);
SecurityService securityService = mock(SecurityService.class);
- serverConnection =
- new ServerConnectionFactory().makeServerConnection(socket, cache, null, null, 0, 0, null,
- CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
+ serverConnection = new ServerConnectionFactory().makeServerConnection(socket, cache, null, null,
+ 0, 0, null, CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor,
+ securityService, InetAddress.getLocalHost());
MockitoAnnotations.initMocks(this);
}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
index 7dee26b..e5b4ae4 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufOpsProcessor.java
@@ -19,6 +19,7 @@ import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatistics;
import org.apache.geode.protocol.protobuf.utilities.ProtobufResponseUtilities;
import org.apache.geode.serialization.SerializationService;
@@ -49,6 +50,7 @@ public class ProtobufOpsProcessor {
result = operationContext.getOperationHandler().process(serializationService,
operationContext.getFromRequest().apply(request), context);
} else {
+ recordAuthorizationViolation(context);
result = Failure.of(ProtobufResponseUtilities.makeErrorResponse(
ProtocolErrorCode.AUTHORIZATION_FAILED.codeValue,
"User isn't authorized for this operation."));
@@ -63,4 +65,9 @@ public class ProtobufOpsProcessor {
operationContext.getToErrorResponse());
return builder.build();
}
+
+ private void recordAuthorizationViolation(MessageExecutionContext context) {
+ ProtobufClientStatistics statistics = (ProtobufClientStatistics) context.getStatistics();
+ statistics.incAuthorizationViolations();
+ }
}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
index f28c310..717365b 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessor.java
@@ -19,13 +19,17 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.geode.StatisticsFactory;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
import org.apache.geode.protocol.protobuf.registry.OperationContextRegistry;
import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatisticsImpl;
import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
@@ -38,6 +42,7 @@ import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredF
public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
private final ProtobufProtocolSerializer protobufProtocolSerializer;
private final ProtobufOpsProcessor protobufOpsProcessor;
+ private ProtobufClientStatistics statistics;
public ProtobufStreamProcessor() throws CodecAlreadyRegisteredForTypeException {
protobufProtocolSerializer = new ProtobufProtocolSerializer();
@@ -46,6 +51,16 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
}
@Override
+ public void initializeStatistics(String statisticsName, StatisticsFactory factory) {
+ statistics = new ProtobufClientStatisticsImpl(factory, statisticsName, "ProtobufServerStats");
+ }
+
+ @Override
+ public ClientProtocolStatistics getStatistics() {
+ return statistics;
+ }
+
+ @Override
public void receiveMessage(InputStream inputStream, OutputStream outputStream,
MessageExecutionContext executionContext) throws IOException {
try {
@@ -62,6 +77,7 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
if (message == null) {
throw new EOFException("Tried to deserialize protobuf message at EOF");
}
+ statistics.messageReceived(message.getSerializedSize());
ClientProtocol.Request request = message.getRequest();
ClientProtocol.Response response = protobufOpsProcessor.process(request, executionContext);
@@ -69,6 +85,7 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
ProtobufUtilities.createMessageHeaderForRequest(message);
ClientProtocol.Message responseMessage =
ProtobufUtilities.createProtobufResponse(responseHeader, response);
+ statistics.messageSent(responseMessage.getSerializedSize());
protobufProtocolSerializer.serialize(responseMessage, outputStream);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatistics.java
similarity index 52%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatistics.java
index 0ced3aa..f769c31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatistics.java
@@ -12,23 +12,20 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
+package org.apache.geode.protocol.protobuf.statistics;
-package org.apache.geode.internal.cache.tier.sockets;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+public interface ProtobufClientStatistics extends ClientProtocolStatistics {
+ public void clientConnected();
+ public void clientDisconnected();
-/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
- */
-public interface ClientProtocolMessageHandler {
- void receiveMessage(InputStream inputStream, OutputStream outputStream,
- MessageExecutionContext executionContext) throws IOException;
+ public void messageReceived(int bytes);
+
+ public void messageSent(int bytes);
+
+ public void incAuthorizationViolations();
+
+ public void incAuthenticationFailures();
}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatisticsImpl.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatisticsImpl.java
new file mode 100644
index 0000000..92f0610
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/statistics/ProtobufClientStatisticsImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.protocol.protobuf.statistics;
+
+import org.apache.geode.StatisticDescriptor;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.StatisticsType;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatistics;
+
+public class ProtobufClientStatisticsImpl implements ProtobufClientStatistics {
+ private final StatisticsType statType;
+ private final Statistics stats;
+ private final int currentClientConnectionsId;
+ private final int clientConnectionTerminationsId;
+ private final int clientConnectionStartsId;
+ private final int bytesReceivedId;
+ private final int bytesSentId;
+ private final int messagesReceivedId;
+ private final int messagesSentId;
+ private final int authorizationViolationsId;
+ private final int authenticationFailuresId;
+
+ public ProtobufClientStatisticsImpl(StatisticsFactory statisticsFactory, String statisticsName,
+ String typeName) {
+ StatisticDescriptor[] serverStatDescriptors = new StatisticDescriptor[] {
+ statisticsFactory.createIntGauge("currentClientConnections",
+ "Number of sockets accepted and used for client to server messaging.", "sockets"),
+ statisticsFactory.createIntCounter("clientConnectionStarts",
+ "Number of sockets accepted and used for client to server messaging.", "sockets"),
+ statisticsFactory.createIntCounter("clientConnectionTerminations",
+ "Number of sockets that were used for client to server messaging.", "sockets"),
+ statisticsFactory.createLongCounter("authenticationFailures", "Authentication failures",
+ "attemptss"),
+ statisticsFactory.createLongCounter("authorizationViolations",
+ "Operations not allowed to proceed", "operations"),
+ statisticsFactory.createLongCounter("bytesReceived",
+ "Bytes received from client messaging.", "bytes"),
+ statisticsFactory.createLongCounter("bytesSent", "Bytes sent for client messaging.",
+ "bytes"),
+ statisticsFactory.createLongCounter("messagesReceived", "Messages received from clients.",
+ "messages"),
+ statisticsFactory.createLongCounter("messagesSent", "Messages sent to clients.",
+ "messages")};
+ statType = statisticsFactory.createType(typeName, "Protobuf client/server statistics",
+ serverStatDescriptors);
+ this.stats = statisticsFactory.createAtomicStatistics(statType, statisticsName);
+ currentClientConnectionsId = this.stats.nameToId("currentClientConnections");
+ clientConnectionStartsId = this.stats.nameToId("clientConnectionStarts");
+ clientConnectionTerminationsId = this.stats.nameToId("clientConnectionTerminations");
+ authorizationViolationsId = this.stats.nameToId("authorizationViolations");
+ authenticationFailuresId = this.stats.nameToId("authenticationFailures");
+ bytesReceivedId = this.stats.nameToId("bytesReceived");
+ bytesSentId = this.stats.nameToId("bytesSent");
+ messagesReceivedId = this.stats.nameToId("bytesReceived");
+ messagesSentId = this.stats.nameToId("bytesSent");
+ }
+
+ @Override
+ public void clientConnected() {
+ stats.incInt(currentClientConnectionsId, 1);
+ stats.incInt(clientConnectionStartsId, 1);
+ }
+
+ @Override
+ public void clientDisconnected() {
+ stats.incInt(currentClientConnectionsId, -1);
+ stats.incInt(clientConnectionTerminationsId, 1);
+ }
+
+ @Override
+ public void messageReceived(int bytes) {
+ stats.incLong(bytesReceivedId, bytes);
+ stats.incLong(messagesReceivedId, 1);
+ }
+
+ @Override
+ public void messageSent(int bytes) {
+ stats.incLong(bytesSentId, bytes);
+ stats.incLong(messagesSentId, 1);
+ }
+
+ @Override
+ public void incAuthorizationViolations() {
+ stats.incLong(authorizationViolationsId, 1);
+ }
+
+ @Override
+ public void incAuthenticationFailures() {
+ stats.incLong(authenticationFailuresId, 1);
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
similarity index 88%
rename from geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
rename to geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
index be9c4a2..d52223c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
@@ -24,6 +24,7 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.security.server.NoOpAuthenticator;
import org.apache.geode.test.junit.categories.UnitTest;
@@ -56,8 +57,10 @@ public class GenericProtocolServerConnectionTest {
Socket socketMock = mock(Socket.class);
when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
+ ClientProtocolMessageHandler mockHandler = mock(ClientProtocolMessageHandler.class);
+ when(mockHandler.getStatistics()).thenReturn(new NoOpProtobufStatistics());
GenericProtocolServerConnection genericProtocolServerConnection =
- getGenericProtocolServerConnection(socketMock, mock(ClientProtocolMessageHandler.class));
+ getGenericProtocolServerConnection(socketMock, mockHandler);
genericProtocolServerConnection.emergencyClose();
@@ -69,6 +72,8 @@ public class GenericProtocolServerConnectionTest {
when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
ClientProtocolMessageHandler clientProtocolMock = mock(ClientProtocolMessageHandler.class);
+ ClientProtocolStatistics statisticsMock = mock(ClientProtocolStatistics.class);
+ when(clientProtocolMock.getStatistics()).thenReturn(statisticsMock);
doThrow(new IOException()).when(clientProtocolMock).receiveMessage(any(), any(), any());
return getGenericProtocolServerConnection(socketMock, clientProtocolMock);
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
index cc15e4f..4a6b44a 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripCacheConnectionJUnitTest.java
@@ -22,15 +22,18 @@ import static org.apache.geode.distributed.ConfigurationProperties.SSL_KEYSTORE_
import static org.apache.geode.distributed.ConfigurationProperties.SSL_REQUIRE_AUTHENTICATION;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE;
import static org.apache.geode.distributed.ConfigurationProperties.SSL_TRUSTSTORE_PASSWORD;
+import static org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase.disconnectAllFromDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.util.Collection;
import java.util.HashSet;
+import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -64,6 +67,7 @@ import org.apache.geode.internal.net.SocketCreatorFactory;
import org.apache.geode.internal.protocol.protobuf.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.RegionAPI;
+import org.apache.geode.internal.statistics.StatArchiveReader;
import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
@@ -111,6 +115,7 @@ public class RoundTripCacheConnectionJUnitTest {
@Rule
public TestName testName = new TestName();
+ private File statisticsArchiveFile;
@Before
public void setup() throws Exception {
@@ -126,6 +131,12 @@ public class RoundTripCacheConnectionJUnitTest {
cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false");
cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false");
+ cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLING_ENABLED, "true");
+ cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLE_RATE, "100");
+ statisticsArchiveFile =
+ new File(getClass().getSimpleName() + "_" + testName.getMethodName() + ".gfs");
+ cacheFactory.set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE,
+ statisticsArchiveFile.getName());
cache = cacheFactory.create();
CacheServer cacheServer = cache.addCacheServer();
@@ -208,6 +219,51 @@ public class RoundTripCacheConnectionJUnitTest {
ProtobufUtilities.createProtobufRequestWithGetAllRequest(getAllRequest));
protobufProtocolSerializer.serialize(getAllMessage, outputStream);
validateGetAllResponse(socket, protobufProtocolSerializer);
+ long startTime = System.currentTimeMillis();
+ Thread.sleep(3000);
+
+ long endTime = System.currentTimeMillis();
+
+ disconnectAllFromDS();
+
+ StatArchiveReader.ValueFilter filter = new StatArchiveReader.ValueFilter() {
+ @Override
+ public boolean archiveMatches(File archive) {
+ return true;
+ }
+
+ @Override
+ public boolean typeMatches(String type) {
+ return type.equals("ProtobufServerStats");
+ }
+
+ @Override
+ public boolean statMatches(String statName) {
+ return true;
+ }
+
+ @Override
+ public boolean instanceMatches(String textId, long numericId) {
+ return true;
+ }
+ };
+
+ StatArchiveReader reader = new StatArchiveReader(new File[] {statisticsArchiveFile},
+ new StatArchiveReader.ValueFilter[] {filter}, true);
+ List resourceInstList = reader.getResourceInstList();
+ // for (Object inst : resourceInstList) {
+ // StatArchiveReader.ResourceInst ri = (StatArchiveReader.ResourceInst) inst;
+ // String resourceName = ri.getName();
+ // String resourceTypeName = ri.getType().getName();
+ // System.out.println("===> resource name: " + resourceName + "; type name: " +
+ // resourceTypeName);
+ // }
+ assertEquals(1, resourceInstList.size());
+ StatArchiveReader.ResourceInst resourceInst =
+ (StatArchiveReader.ResourceInst) resourceInstList.iterator().next();
+ StatArchiveReader.StatValue statValue =
+ resourceInst.getStatValue("currentClientConnections").createTrimmed(startTime, endTime);
+ assertEquals(2.0, statValue.getSnapshotsMinimum(), 0.01);
}
@Test
@@ -297,6 +353,10 @@ public class RoundTripCacheConnectionJUnitTest {
CacheFactory cacheFactory = new CacheFactory();
cacheFactory.set(ConfigurationProperties.LOCATORS, "");
cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
+ cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLING_ENABLED, "true");
+ cacheFactory.set(ConfigurationProperties.STATISTIC_SAMPLE_RATE, "100");
+ cacheFactory.set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE,
+ getClass().getSimpleName() + "_" + testName.getMethodName() + ".gfs");
cache = cacheFactory.create();
CacheServer cacheServer = cache.addCacheServer();
@@ -326,6 +386,7 @@ public class RoundTripCacheConnectionJUnitTest {
assertEquals(-1, socket.getInputStream().read()); // EOF implies disconnected.
}
+ Thread.sleep(15000);
for (Socket currentSocket : sockets) {
currentSocket.close();
}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionDUnitTest.java
index 5b627b8..9603caf 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/RoundTripLocatorConnectionDUnitTest.java
@@ -19,6 +19,7 @@ import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufCli
import static org.junit.Assert.assertEquals;
import java.io.DataOutputStream;
+import java.io.File;
import java.io.IOException;
import java.net.Socket;
@@ -29,7 +30,9 @@ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.Locator;
+import org.apache.geode.internal.Config;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.protocol.exception.InvalidProtocolMessageException;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
@@ -51,6 +54,7 @@ import org.junit.experimental.categories.Category;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
+import java.util.Properties;
@Category(DistributedTest.class)
public class RoundTripLocatorConnectionDUnitTest extends JUnit4CacheTestCase {
@@ -129,6 +133,20 @@ public class RoundTripLocatorConnectionDUnitTest extends JUnit4CacheTestCase {
messageResponse.getErrorResponse().getError().getErrorCode());
}
+ @Override
+ public Properties getDistributedSystemProperties() {
+ Properties properties = super.getDistributedSystemProperties();
+ properties.put(ConfigurationProperties.STATISTIC_SAMPLING_ENABLED, "true");
+ properties.put(ConfigurationProperties.STATISTIC_SAMPLE_RATE, "100");
+ String statFileName = getUniqueName() + ".gfs";
+ properties.put(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statFileName);
+ File statFile = new File(statFileName);
+ if (statFile.exists()) {
+ statFile.delete();
+ }
+ return properties;
+ }
+
private Integer startCacheWithCacheServer() throws IOException {
System.setProperty("geode.feature-protobuf-protocol", "true");
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
index 50d7b40..4806297 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufStreamProcessorTest.java
@@ -27,6 +27,7 @@ import org.junit.experimental.categories.Category;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.security.server.NoOpAuthorizer;
import org.apache.geode.test.junit.categories.UnitTest;
@@ -39,7 +40,7 @@ public class ProtobufStreamProcessorTest {
ProtobufStreamProcessor protobufStreamProcessor = new ProtobufStreamProcessor();
InternalCache mockInternalCache = mock(InternalCache.class);
- protobufStreamProcessor.receiveMessage(inputStream, outputStream,
- new MessageExecutionContext(mockInternalCache, new NoOpAuthorizer()));
+ protobufStreamProcessor.receiveMessage(inputStream, outputStream, new MessageExecutionContext(
+ mockInternalCache, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
}
}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
index 0850f25..91a6336 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetAllRequestOperationHandlerJUnitTest.java
@@ -39,6 +39,7 @@ import org.apache.geode.internal.protocol.protobuf.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Result;
import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.security.server.NoOpAuthorizer;
@@ -80,9 +81,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
public void processReturnsExpectedValuesForValidKeys()
throws CodecAlreadyRegisteredForTypeException, UnsupportedEncodingTypeException,
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
- Result<RegionAPI.GetAllResponse> result =
- operationHandler.process(serializationServiceStub, generateTestRequest(true, false),
- new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
+ generateTestRequest(true, false),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
@@ -101,9 +102,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
@Test
public void processReturnsNoEntriesForNoKeysRequested() throws UnsupportedEncodingTypeException,
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
- Result<RegionAPI.GetAllResponse> result =
- operationHandler.process(serializationServiceStub, generateTestRequest(false, false),
- new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
+ generateTestRequest(false, false),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
@@ -120,7 +121,8 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
RegionAPI.GetAllRequest getAllRequest =
ProtobufRequestUtilities.createGetAllRequest(TEST_REGION, testKeys);
Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
- getAllRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ getAllRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
RegionAPI.GetAllResponse message = result.getMessage();
@@ -134,9 +136,9 @@ public class GetAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
@Test
public void multipleKeysWhereOneThrows() throws UnsupportedEncodingTypeException,
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
- Result<RegionAPI.GetAllResponse> result =
- operationHandler.process(serializationServiceStub, generateTestRequest(true, true),
- new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ Result<RegionAPI.GetAllResponse> result = operationHandler.process(serializationServiceStub,
+ generateTestRequest(true, true),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
index 0d09148..10bd5c6 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionNamesRequestOperationHandlerJUnitTest.java
@@ -32,6 +32,7 @@ import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Result;
import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.security.server.NoOpAuthorizer;
import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
@@ -68,7 +69,7 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
Result<RegionAPI.GetRegionNamesResponse> result = operationHandler.process(
serializationServiceStub, ProtobufRequestUtilities.createGetRegionNamesRequest(),
- new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(result instanceof Success);
RegionAPI.GetRegionNamesResponse getRegionsResponse = result.getMessage();
@@ -92,9 +93,10 @@ public class GetRegionNamesRequestOperationHandlerJUnitTest extends OperationHan
Cache emptyCache = mock(Cache.class);;
when(emptyCache.rootRegions())
.thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>()));
- Result<RegionAPI.GetRegionNamesResponse> result = operationHandler.process(
- serializationServiceStub, ProtobufRequestUtilities.createGetRegionNamesRequest(),
- new MessageExecutionContext(emptyCache, new NoOpAuthorizer()));
+ Result<RegionAPI.GetRegionNamesResponse> result =
+ operationHandler.process(serializationServiceStub,
+ ProtobufRequestUtilities.createGetRegionNamesRequest(), new MessageExecutionContext(
+ emptyCache, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(result instanceof Success);
RegionAPI.GetRegionNamesResponse getRegionsResponse = result.getMessage();
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
index 3458e21..ee63cd2 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRegionRequestOperationHandlerJUnitTest.java
@@ -14,6 +14,16 @@
*/
package org.apache.geode.protocol.protobuf.operations;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.Region;
@@ -21,28 +31,20 @@ import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.protocol.MessageUtil;
import org.apache.geode.internal.protocol.protobuf.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
+import org.apache.geode.protocol.MessageUtil;
import org.apache.geode.protocol.protobuf.Failure;
import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Result;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.security.server.NoOpAuthorizer;
import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException;
import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.Collections;
-import java.util.HashSet;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
@Category(UnitTest.class)
public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -77,7 +79,7 @@ public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJ
Result<RegionAPI.GetRegionResponse> result = operationHandler.process(serializationServiceStub,
MessageUtil.makeGetRegionRequest(TEST_REGION1),
- new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
RegionAPI.GetRegionResponse response = result.getMessage();
BasicTypes.Region region = response.getRegion();
Assert.assertEquals(TEST_REGION1, region.getName());
@@ -102,8 +104,8 @@ public class GetRegionRequestOperationHandlerJUnitTest extends OperationHandlerJ
.thenReturn(Collections.unmodifiableSet(new HashSet<Region<String, String>>()));
String unknownRegionName = "UNKNOWN_REGION";
Result<RegionAPI.GetRegionResponse> result = operationHandler.process(serializationServiceStub,
- MessageUtil.makeGetRegionRequest(unknownRegionName),
- new MessageExecutionContext(emptyCache, new NoOpAuthorizer()));
+ MessageUtil.makeGetRegionRequest(unknownRegionName), new MessageExecutionContext(emptyCache,
+ new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(result instanceof Failure);
Assert.assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
result.getErrorMessage().getError().getErrorCode());
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
index 5bdd5d7..9b4109c 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/GetRequestOperationHandlerJUnitTest.java
@@ -14,16 +14,27 @@
*/
package org.apache.geode.protocol.protobuf.operations;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.UnsupportedEncodingException;
+
import com.google.protobuf.ByteString;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.protobuf.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Failure;
import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Result;
import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.security.server.NoOpAuthorizer;
@@ -32,15 +43,6 @@ import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredF
import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.UnsupportedEncodingException;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
@Category(UnitTest.class)
public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -73,7 +75,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
RegionAPI.GetRequest getRequest = generateTestRequest(false, false, false);
Result<RegionAPI.GetResponse> result = operationHandler.process(serializationServiceStub,
- getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ getRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(result instanceof Success);
Assert.assertEquals(BasicTypes.EncodedValue.ValueCase.STRINGRESULT,
@@ -88,7 +91,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
RegionAPI.GetRequest getRequest = generateTestRequest(true, false, false);
Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
- getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ getRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(response instanceof Failure);
Assert.assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -101,7 +105,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
RegionAPI.GetRequest getRequest = generateTestRequest(false, true, false);
Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
- getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ getRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(response instanceof Success);
}
@@ -112,7 +117,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
RegionAPI.GetRequest getRequest = generateTestRequest(false, false, true);
Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
- getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ getRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(response instanceof Success);
}
@@ -134,7 +140,8 @@ public class GetRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
RegionAPI.GetRequest getRequest =
ProtobufRequestUtilities.createGetRequest(TEST_REGION, encodedKey).getGetRequest();
Result<RegionAPI.GetResponse> response = operationHandler.process(serializationServiceStub,
- getRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ getRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(response instanceof Failure);
Assert.assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
index 36a5cf4..1c735e1 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutAllRequestOperationHandlerJUnitTest.java
@@ -14,6 +14,21 @@
*/
package org.apache.geode.protocol.protobuf.operations;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
@@ -21,6 +36,7 @@ import org.apache.geode.internal.protocol.protobuf.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Result;
import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.security.server.NoOpAuthorizer;
@@ -29,20 +45,6 @@ import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredF
import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
import org.apache.geode.test.dunit.Assert;
import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
@Category(UnitTest.class)
public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -74,9 +76,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
CodecAlreadyRegisteredForTypeException, InvalidExecutionContextException {
PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
- Result<RegionAPI.PutAllResponse> result =
- operationHandler.process(serializationServiceStub, generateTestRequest(false, true),
- new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
+ generateTestRequest(false, true),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
Assert.assertTrue(result instanceof Success);
@@ -89,9 +91,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
public void processWithInvalidEntrySucceedsAndReturnsFailedKey() throws Exception {
PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
- Result<RegionAPI.PutAllResponse> result =
- operationHandler.process(serializationServiceStub, generateTestRequest(true, true),
- new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
+ generateTestRequest(true, true),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
verify(regionMock).put(TEST_KEY1, TEST_VALUE1);
@@ -109,9 +111,9 @@ public class PutAllRequestOperationHandlerJUnitTest extends OperationHandlerJUni
public void processWithNoEntriesPasses() throws Exception {
PutAllRequestOperationHandler operationHandler = new PutAllRequestOperationHandler();
- Result<RegionAPI.PutAllResponse> result =
- operationHandler.process(serializationServiceStub, generateTestRequest(false, false),
- new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ Result<RegionAPI.PutAllResponse> result = operationHandler.process(serializationServiceStub,
+ generateTestRequest(false, false),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
index 5235a90..37aa379 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/PutRequestOperationHandlerJUnitTest.java
@@ -14,16 +14,32 @@
*/
package org.apache.geode.protocol.protobuf.operations;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.UnsupportedEncodingException;
+
import com.google.protobuf.ByteString;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.protobuf.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Failure;
import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Result;
import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.security.server.NoOpAuthorizer;
@@ -31,20 +47,6 @@ import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException
import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.UnsupportedEncodingException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
@Category(UnitTest.class)
public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -69,7 +71,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
CodecAlreadyRegisteredForTypeException, InvalidExecutionContextException {
PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
- generateTestRequest(), new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ generateTestRequest(),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
@@ -100,7 +103,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
RegionAPI.PutRequest putRequest =
ProtobufRequestUtilities.createPutRequest(TEST_REGION, testEntry).getPutRequest();
Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
- putRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ putRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Failure);
assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
@@ -114,7 +118,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
when(cacheStub.getRegion(TEST_REGION)).thenReturn(null);
PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
- generateTestRequest(), new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ generateTestRequest(),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Failure);
assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -129,7 +134,8 @@ public class PutRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTe
PutRequestOperationHandler operationHandler = new PutRequestOperationHandler();
Result<RegionAPI.PutResponse> result = operationHandler.process(serializationServiceStub,
- generateTestRequest(), new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ generateTestRequest(),
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Failure);
assertEquals(ProtocolErrorCode.CONSTRAINT_VIOLATION.codeValue,
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
index a5bde46..6e04214 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/operations/RemoveRequestOperationHandlerJUnitTest.java
@@ -14,17 +14,31 @@
*/
package org.apache.geode.protocol.protobuf.operations;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.UnsupportedEncodingException;
+
import com.google.protobuf.ByteString;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.protobuf.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Failure;
import org.apache.geode.protocol.protobuf.ProtocolErrorCode;
-import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.protocol.protobuf.Result;
import org.apache.geode.protocol.protobuf.Success;
+import org.apache.geode.protocol.protobuf.statistics.NoOpProtobufStatistics;
import org.apache.geode.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.security.server.NoOpAuthorizer;
@@ -32,18 +46,6 @@ import org.apache.geode.serialization.exception.UnsupportedEncodingTypeException
import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
import org.apache.geode.serialization.registry.exception.CodecNotRegisteredForTypeException;
import org.apache.geode.test.junit.categories.UnitTest;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.UnsupportedEncodingException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
@Category(UnitTest.class)
public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUnitTest {
@@ -74,7 +76,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
RegionAPI.RemoveRequest removeRequest = generateTestRequest(false, false).getRemoveRequest();
Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
- removeRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ removeRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
verify(regionStub).remove(TEST_KEY);
@@ -86,7 +89,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
RegionAPI.RemoveRequest removeRequest = generateTestRequest(true, false).getRemoveRequest();
Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
- removeRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ removeRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Failure);
assertEquals(ProtocolErrorCode.REGION_NOT_FOUND.codeValue,
@@ -99,7 +103,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
CodecNotRegisteredForTypeException, InvalidExecutionContextException {
RegionAPI.RemoveRequest removeRequest = generateTestRequest(false, true).getRemoveRequest();
Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
- removeRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ removeRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Success);
}
@@ -122,7 +127,8 @@ public class RemoveRequestOperationHandlerJUnitTest extends OperationHandlerJUni
RegionAPI.RemoveRequest removeRequest =
ProtobufRequestUtilities.createRemoveRequest(TEST_REGION, encodedKey).getRemoveRequest();;
Result<RegionAPI.RemoveResponse> result = operationHandler.process(serializationServiceStub,
- removeRequest, new MessageExecutionContext(cacheStub, new NoOpAuthorizer()));
+ removeRequest,
+ new MessageExecutionContext(cacheStub, new NoOpAuthorizer(), new NoOpProtobufStatistics()));
assertTrue(result instanceof Failure);
assertEquals(ProtocolErrorCode.VALUE_ENCODING_ERROR.codeValue,
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/statistics/NoOpProtobufStatistics.java
similarity index 52%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/statistics/NoOpProtobufStatistics.java
index 0ced3aa..20ec40c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/statistics/NoOpProtobufStatistics.java
@@ -12,23 +12,38 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
+package org.apache.geode.protocol.protobuf.statistics;
-package org.apache.geode.internal.cache.tier.sockets;
+import org.apache.geode.protocol.protobuf.statistics.ProtobufClientStatistics;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
+public class NoOpProtobufStatistics implements ProtobufClientStatistics {
+ @Override
+ public void clientConnected() {
+ }
-/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
- */
-public interface ClientProtocolMessageHandler {
- void receiveMessage(InputStream inputStream, OutputStream outputStream,
- MessageExecutionContext executionContext) throws IOException;
+ @Override
+ public void clientDisconnected() {
+
+ }
+
+ @Override
+ public void messageReceived(int bytes) {
+
+ }
+
+ @Override
+ public void messageSent(int bytes) {
+
+ }
+
+ @Override
+ public void incAuthorizationViolations() {
+
+ }
+
+ @Override
+ public void incAuthenticationFailures() {
+
+ }
}
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].