You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by go...@apache.org on 2017/10/10 18:17:30 UTC
[geode] branch develop updated: GEODE-3751: a single place for
client protocol loading, logic.
This is an automated email from the ASF dual-hosted git repository.
gosullivan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 033757f GEODE-3751: a single place for client protocol loading, logic.
033757f is described below
commit 033757f5569126ad84ee4badf2c283481ec205b4
Author: Brian Rowe <br...@pivotal.io>
AuthorDate: Wed Oct 4 14:13:13 2017 -0700
GEODE-3751: a single place for client protocol loading, logic.
* Load a single service, `ClientProtocolService`.
- (well, authenticators are still separate).
* Move several interfaces to `geode-protobuf` from core.
* Client protocol state and statistics are now stored on the new
`ClientProtocolPipeline` interface. This will allow us to keep
connection logic for core separated from protocol logic.
* implement pipelines for Cache and Locator.
* some other small fixes and changes.
Signed-off-by: Galen O'Sullivan <go...@pivotal.io>
This closes #873
Signed-off-by: Brian Rowe <br...@pivotal.io>
---
.../distributed/internal/InternalLocator.java | 9 ++-
.../distributed/internal/tcpserver/TcpServer.java | 34 ++++++---
.../internal/cache/tier/sockets/AcceptorImpl.java | 2 +-
...geHandler.java => ClientProtocolProcessor.java} | 32 ++++----
...sageHandler.java => ClientProtocolService.java} | 35 +++++----
...ctory.java => ClientProtocolServiceLoader.java} | 20 +++--
.../sockets/GenericProtocolServerConnection.java | 27 ++-----
.../tier/sockets/ServerConnectionFactory.java | 63 ++++++++++------
.../cache/tier/sockets/TcpServerFactory.java | 11 ++-
.../tier/sockets/ServerConnectionFactoryTest.java | 18 +++--
.../cache/tier/sockets/ServerConnectionTest.java | 6 +-
.../tier/sockets/ClientProtocolMessageHandler.java | 4 -
.../tier/sockets/MessageExecutionContext.java | 14 ++--
.../internal/protocol/ProtobufCachePipeline.java | 66 ++++++++++++++++
.../internal/protocol/ProtobufLocatorPipeline.java | 55 ++++++++++++++
.../internal/protocol/ProtobufProtocolService.java | 61 +++++++++++++++
.../protocol/protobuf/ProtobufStreamProcessor.java | 12 +--
.../protobuf/statistics}/NoOpStatistics.java | 24 +++++-
.../statistics/ProtobufClientStatistics.java | 14 ++--
...cache.tier.sockets.ClientProtocolMessageHandler | 1 -
...ternal.cache.tier.sockets.ClientProtocolService | 1 +
.../GenericProtocolServerConnectionTest.java | 39 +++++-----
.../acceptance/CacheConnectionJUnitTest.java | 9 ++-
.../acceptance/LocatorConnectionDUnitTest.java | 88 +++++++++-------------
...tAvailableServersOperationHandlerJUnitTest.java | 11 ++-
25 files changed, 433 insertions(+), 223 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
index 489e647..26864d2 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/InternalLocator.java
@@ -62,7 +62,7 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.admin.remote.DistributionLocatorId;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolService;
import org.apache.geode.internal.cache.tier.sockets.TcpServerFactory;
import org.apache.geode.internal.cache.wan.WANServiceProvider;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -1335,9 +1335,10 @@ public class InternalLocator extends Locator implements ConnectListener {
try {
this.stats.hookupStats(sys,
SocketCreator.getLocalHost().getCanonicalHostName() + '-' + this.server.getBindAddress());
- ClientProtocolMessageHandler messageHandler = this.server.getClientProtocolMessageHandler();
- if (messageHandler != null) {
- messageHandler.initializeStatistics("LocatorStats", sys);
+
+ ClientProtocolService clientProtocolService = this.server.getClientProtocolService();
+ if (clientProtocolService != null) {
+ clientProtocolService.initializeStatistics("LocatorStats", sys);
}
} catch (UnknownHostException e) {
logger.warn(e);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 85b2ace..b9d10f6 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -43,6 +43,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.SystemFailure;
+import org.apache.geode.cache.IncompatibleVersionException;
import org.apache.geode.distributed.internal.ClusterConfigurationService;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
@@ -58,9 +59,9 @@ import org.apache.geode.internal.VersionedDataInputStream;
import org.apache.geode.internal.VersionedDataOutputStream;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolService;
import org.apache.geode.internal.cache.tier.sockets.HandShake;
-import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.net.SocketCreatorFactory;
@@ -133,7 +134,7 @@ public class TcpServer {
private final PoolStatHelper poolHelper;
private final InternalLocator internalLocator;
private final TcpHandler handler;
- private ClientProtocolMessageHandler clientProtocolMessageHandler;
+ private final ClientProtocolService clientProtocolService;
private PooledExecutorWithDMStats executor;
@@ -158,20 +159,20 @@ public class TcpServer {
/**
* returns the message handler used for client/locator communications processing
*/
- public ClientProtocolMessageHandler getClientProtocolMessageHandler() {
- return clientProtocolMessageHandler;
+ public ClientProtocolService getClientProtocolService() {
+ return clientProtocolService;
}
public TcpServer(int port, InetAddress bind_address, Properties sslConfig,
DistributionConfigImpl cfg, TcpHandler handler, PoolStatHelper poolHelper,
ThreadGroup threadGroup, String threadName, InternalLocator internalLocator,
- ClientProtocolMessageHandler clientProtocolMessageHandler) {
+ ClientProtocolService clientProtocolService) {
this.port = port;
this.bind_address = bind_address;
this.handler = handler;
this.poolHelper = poolHelper;
this.internalLocator = internalLocator;
- this.clientProtocolMessageHandler = clientProtocolMessageHandler;
+ this.clientProtocolService = clientProtocolService;
// register DSFID types first; invoked explicitly so that all message type
// initializations do not happen in first deserialization on a possibly
// "precious" thread
@@ -381,10 +382,21 @@ public class TcpServer {
if (gossipVersion == NON_GOSSIP_REQUEST_VERSION) {
if (input.readUnsignedByte() == PROTOBUF_CLIENT_SERVER_PROTOCOL
&& Boolean.getBoolean("geode.feature-protobuf-protocol")) {
- clientProtocolMessageHandler.getStatistics().clientConnected();
- clientProtocolMessageHandler.receiveMessage(input, socket.getOutputStream(),
- new MessageExecutionContext(internalLocator));
- clientProtocolMessageHandler.getStatistics().clientDisconnected();
+ if (clientProtocolService == null) {
+ // this shouldn't happen.
+ log.error("Client protocol service not initialized but a request was received");
+ socket.close();
+ throw new IOException(
+ "Client protocol service not initialized but a request was received");
+ } else {
+ try (ClientProtocolProcessor pipeline =
+ clientProtocolService.createProcessorForLocator(internalLocator)) {
+ pipeline.processMessage(input, socket.getOutputStream());
+ } catch (IncompatibleVersionException e) {
+ // should not happen on the locator as there is no handshake.
+ log.error("Unexpected exception in client message processing", e);
+ }
+ }
} else {
rejectUnknownProtocolConnection(socket, gossipVersion);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index c660b68..704369e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -1463,7 +1463,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
ServerConnection serverConn =
serverConnectionFactory.makeServerConnection(socket, this.cache, this.crHelper, this.stats,
AcceptorImpl.handShakeTimeout, this.socketBufferSize, communicationMode.toString(),
- communicationMode.getModeNumber(), this, this.securityService, this.getBindAddress());
+ communicationMode.getModeNumber(), this, this.securityService);
synchronized (this.allSCsLock) {
this.allSCs.add(serverConn);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolProcessor.java
similarity index 55%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolProcessor.java
index b8969e1..96d2a89 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolProcessor.java
@@ -19,23 +19,25 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.apache.geode.Statistics;
-import org.apache.geode.StatisticsFactory;
-
+import org.apache.geode.cache.IncompatibleVersionException;
/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
+ * An interface that does the message handling part of a protocol for a particular connection. It
+ * does not manage the socket.
*/
-public interface ClientProtocolMessageHandler {
- void initializeStatistics(String statisticsName, StatisticsFactory factory);
-
- ClientProtocolStatistics getStatistics();
+public interface ClientProtocolProcessor extends AutoCloseable {
+ /**
+ * @throws IncompatibleVersionException if a client tries to connect with version that is
+ * incompatible with the current version of the server.
+ */
+ void processMessage(InputStream inputStream, OutputStream outputStream)
+ throws IOException, IncompatibleVersionException;
- void receiveMessage(InputStream inputStream, OutputStream outputStream,
- MessageExecutionContext executionContext) throws IOException;
+ /**
+ * Close the pipeline, incrementing stats and releasing any resources.
+ *
+ * This declaration narrows the exception type to be IOException.
+ */
+ @Override
+ void close();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolService.java
similarity index 51%
copy from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
copy to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolService.java
index b8969e1..64f5365 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolService.java
@@ -15,27 +15,30 @@
package org.apache.geode.internal.cache.tier.sockets;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.geode.Statistics;
import org.apache.geode.StatisticsFactory;
-
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.server.Authenticator;
/**
- * This is an interface that other modules can implement to hook into
- * {@link GenericProtocolServerConnection} to handle messages sent to Geode.
- *
- * Currently, only one {@link ClientProtocolMessageHandler} at a time can be used in a Geode
- * instance. It gets wired into {@link ServerConnectionFactory} to create all instances of
- * {@link GenericProtocolServerConnection}.
+ * Provides a convenient location for a client protocol service to be loaded into the system.
*/
-public interface ClientProtocolMessageHandler {
+public interface ClientProtocolService {
void initializeStatistics(String statisticsName, StatisticsFactory factory);
- ClientProtocolStatistics getStatistics();
+ /**
+ *
+ * The pipeline MUST use an available authenticator for authentication of all operations once the
+ * handshake has happened.
+ *
+ * @param availableAuthenticators A list of valid authenticators for the current system.
+ */
+ ClientProtocolProcessor createProcessorForCache(Cache cache,
+ Authenticator availableAuthenticators, SecurityService securityService);
- void receiveMessage(InputStream inputStream, OutputStream outputStream,
- MessageExecutionContext executionContext) throws IOException;
+ /**
+ * Create a locator pipeline. The locator does not currently provide any authentication.
+ */
+ ClientProtocolProcessor createProcessorForLocator(InternalLocator locator);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolServiceLoader.java
similarity index 62%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java
rename to geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolServiceLoader.java
index 2aca8c2..7e1e3f5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageHandlerFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolServiceLoader.java
@@ -18,17 +18,23 @@ package org.apache.geode.internal.cache.tier.sockets;
import java.util.Iterator;
import java.util.ServiceLoader;
-public class MessageHandlerFactory {
- public ClientProtocolMessageHandler makeMessageHandler() {
- ServiceLoader<ClientProtocolMessageHandler> loader =
- ServiceLoader.load(ClientProtocolMessageHandler.class);
- Iterator<ClientProtocolMessageHandler> iterator = loader.iterator();
+public class ClientProtocolServiceLoader {
+ public ClientProtocolService loadService() {
+ ServiceLoader<ClientProtocolService> loader = ServiceLoader.load(ClientProtocolService.class);
+ Iterator<ClientProtocolService> iterator = loader.iterator();
if (!iterator.hasNext()) {
throw new ServiceLoadingFailureException(
- "There is no ClientProtocolMessageHandler implementation found in JVM");
+ "There is no ClientProtocolService implementation found in JVM");
}
- return iterator.next();
+ ClientProtocolService service = iterator.next();
+
+ if (iterator.hasNext()) {
+ throw new ServiceLoadingFailureException(
+ "There is more than one ClientProtocolService implementation found in JVM; aborting");
+ }
+
+ return service;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index eb401b9..5be6cac 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -22,6 +22,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
+import org.apache.geode.cache.IncompatibleVersionException;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ServerLocation;
@@ -31,17 +32,13 @@ import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.SecurityManager;
-import org.apache.geode.security.server.Authenticator;
/**
* Holds the socket and protocol handler for the new client protocol.
*/
public class GenericProtocolServerConnection extends ServerConnection {
// The new protocol lives in a separate module and gets loaded when this class is instantiated.
- private final ClientProtocolMessageHandler messageHandler;
- private final SecurityManager securityManager;
- private final Authenticator authenticator;
+ private final ClientProtocolProcessor protocolPipeline;
private boolean cleanedUp;
private ClientProxyMembershipID clientProxyMembershipID;
@@ -51,14 +48,11 @@ public class GenericProtocolServerConnection extends ServerConnection {
*/
public GenericProtocolServerConnection(Socket socket, InternalCache c, CachedRegionHelper helper,
CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
- byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol,
- SecurityService securityService, Authenticator authenticator) {
+ byte communicationMode, Acceptor acceptor, ClientProtocolProcessor clientProtocolProcessor,
+ SecurityService securityService) {
super(socket, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr,
communicationMode, acceptor, securityService);
- securityManager = securityService.getSecurityManager();
- this.messageHandler = newClientProtocol;
- this.authenticator = authenticator;
- this.messageHandler.getStatistics().clientConnected();
+ this.protocolPipeline = clientProtocolProcessor;
setClientProxyMembershipId();
@@ -72,17 +66,12 @@ public class GenericProtocolServerConnection extends ServerConnection {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
- if (!authenticator.isAuthenticated()) {
- authenticator.authenticate(inputStream, outputStream, securityManager);
- } else {
- messageHandler.receiveMessage(inputStream, outputStream, new MessageExecutionContext(
- this.getCache(), authenticator.getAuthorizer(), messageHandler.getStatistics()));
- }
+ protocolPipeline.processMessage(inputStream, outputStream);
} catch (EOFException e) {
this.setFlagProcessMessagesAsFalse();
setClientDisconnectedException(e);
logger.debug("Encountered EOF while processing message: {}", e);
- } catch (IOException e) {
+ } catch (IOException | IncompatibleVersionException e) {
logger.warn(e);
this.setFlagProcessMessagesAsFalse();
setClientDisconnectedException(e);
@@ -105,7 +94,7 @@ public class GenericProtocolServerConnection extends ServerConnection {
synchronized (this) {
if (!cleanedUp) {
cleanedUp = true;
- messageHandler.getStatistics().clientDisconnected();
+ protocolPipeline.close();
}
}
return super.cleanup();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index 8f3a8c3..e550700 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -18,7 +18,6 @@ package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
import java.io.IOException;
-import java.net.InetAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
@@ -35,7 +34,7 @@ import org.apache.geode.security.server.Authenticator;
* Creates instances of ServerConnection based on the connection mode provided.
*/
public class ServerConnectionFactory {
- private ClientProtocolMessageHandler protocolHandler;
+ private volatile ClientProtocolService clientProtocolService;
private Map<String, Class<? extends Authenticator>> authenticators = null;
public ServerConnectionFactory() {}
@@ -44,7 +43,7 @@ public class ServerConnectionFactory {
if (authenticators != null) {
return;
}
- HashMap tmp = new HashMap<>();
+ HashMap<String, Class<? extends Authenticator>> tmp = new HashMap<>();
ServiceLoader<Authenticator> loader = ServiceLoader.load(Authenticator.class);
for (Authenticator streamAuthenticator : loader) {
@@ -54,17 +53,18 @@ public class ServerConnectionFactory {
authenticators = tmp;
}
- private synchronized ClientProtocolMessageHandler initializeMessageHandler(
+ private synchronized ClientProtocolService initializeClientProtocolService(
StatisticsFactory statisticsFactory, String statisticsName) {
- if (protocolHandler != null) {
- return protocolHandler;
+ if (clientProtocolService != null) {
+ return clientProtocolService;
}
- ClientProtocolMessageHandler tempHandler = new MessageHandlerFactory().makeMessageHandler();
- tempHandler.initializeStatistics(statisticsName, statisticsFactory);
+ // use temp to make sure we publish properly.
+ ClientProtocolService tmp = new ClientProtocolServiceLoader().loadService();
+ tmp.initializeStatistics(statisticsName, statisticsFactory);
- protocolHandler = tempHandler;
- return protocolHandler;
+ clientProtocolService = tmp;
+ return clientProtocolService;
}
private Authenticator findStreamAuthenticator(String implementationID) {
@@ -86,34 +86,51 @@ public class ServerConnectionFactory {
}
}
- private ClientProtocolMessageHandler getOrCreateClientProtocolMessageHandler(
+ private ClientProtocolService getOrCreateClientProtocolService(
StatisticsFactory statisticsFactory, String serverName) {
- if (protocolHandler == null) {
- return initializeMessageHandler(statisticsFactory, serverName);
+ if (clientProtocolService == null) {
+ return initializeClientProtocolService(statisticsFactory, serverName);
}
- return protocolHandler;
+ return clientProtocolService;
}
public ServerConnection makeServerConnection(Socket socket, InternalCache cache,
CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
String communicationModeStr, byte communicationMode, Acceptor acceptor,
- SecurityService securityService, InetAddress bindAddress) throws IOException {
+ SecurityService securityService) throws IOException {
if (communicationMode == ProtobufClientServerProtocol.getModeNumber()) {
if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
throw new IOException("Server received unknown communication mode: " + communicationMode);
} else {
- String authenticationMode =
- System.getProperty("geode.protocol-authentication-mode", "NOOP");
-
- return new GenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
- socketBufferSize, communicationModeStr, communicationMode, acceptor,
- getOrCreateClientProtocolMessageHandler(cache.getDistributedSystem(),
- acceptor.getServerName()),
- securityService, findStreamAuthenticator(authenticationMode));
+ try {
+ String authenticationMode =
+ System.getProperty("geode.protocol-authentication-mode", "NOOP");
+
+ return createGenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
+ socketBufferSize, communicationModeStr, communicationMode, acceptor, securityService,
+ authenticationMode);
+ } catch (ServiceLoadingFailureException ex) {
+ throw new IOException("Could not load protobuf client protocol", ex);
+ }
}
} else {
return new LegacyServerConnection(socket, cache, helper, stats, hsTimeout, socketBufferSize,
communicationModeStr, communicationMode, acceptor, securityService);
}
}
+
+ private ServerConnection createGenericProtocolServerConnection(Socket socket, InternalCache cache,
+ CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
+ String communicationModeStr, byte communicationMode, Acceptor acceptor,
+ SecurityService securityService, String authenticationMode) {
+ ClientProtocolService service =
+ getOrCreateClientProtocolService(cache.getDistributedSystem(), acceptor.getServerName());
+
+ ClientProtocolProcessor processor = service.createProcessorForCache(cache,
+ findStreamAuthenticator(authenticationMode), securityService);
+
+ return new GenericProtocolServerConnection(socket, cache, helper, stats, hsTimeout,
+ socketBufferSize, communicationModeStr, communicationMode, acceptor, processor,
+ securityService);
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
index 959e6e2..bf442f8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/TcpServerFactory.java
@@ -20,6 +20,7 @@ import java.util.Properties;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.cache.IncompatibleVersionException;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.PoolStatHelper;
@@ -28,15 +29,17 @@ import org.apache.geode.distributed.internal.tcpserver.TcpServer;
import org.apache.geode.internal.logging.LogService;
public class TcpServerFactory {
- private ClientProtocolMessageHandler protocolHandler;
+ private final ClientProtocolService clientProtocolService;
static final Logger logger = LogService.getLogger();
public TcpServerFactory() {
+ ClientProtocolService service = null;
try {
- protocolHandler = new MessageHandlerFactory().makeMessageHandler();
+ service = new ClientProtocolServiceLoader().loadService();
} catch (ServiceLoadingFailureException ex) {
- logger.warn(ex.getMessage());
+ logger.warn("Could not load client protocol", ex);
}
+ clientProtocolService = service;
}
public TcpServer makeTcpServer(int port, InetAddress bind_address, Properties sslConfig,
@@ -44,6 +47,6 @@ public class TcpServerFactory {
ThreadGroup threadGroup, String threadName, InternalLocator internalLocator) {
return new TcpServer(port, bind_address, sslConfig, cfg, handler, poolHelper, threadGroup,
- threadName, internalLocator, protocolHandler);
+ threadName, internalLocator, clientProtocolService);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index 56d3770..a3f34b0 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -20,6 +20,8 @@ import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.test.junit.categories.UnitTest;
+
+import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.RestoreSystemProperties;
@@ -59,14 +61,16 @@ public class ServerConnectionFactoryTest {
}
/**
- * @throws ServiceLoadingFailureException because the service is implemented in a different
- * module, and when this unit test is run, that module won't be present.
+ * @throws IOException caused by ServiceLoadingFailureException because the service is implemented
+ * in a different module, and when this unit test is run, that module won't be present.
*/
- @Test(expected = ServiceLoadingFailureException.class)
+ @Test
public void newClientProtocolFailsWithSystemPropertySet() throws IOException {
- System.setProperty("geode.feature-protobuf-protocol", "true");
- ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(
- CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+ Assertions.assertThatThrownBy(() -> {
+ System.setProperty("geode.feature-protobuf-protocol", "true");
+ ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(
+ CommunicationMode.ProtobufClientServerProtocol.getModeNumber());
+ }).hasRootCauseInstanceOf(ServiceLoadingFailureException.class);
}
@Test
@@ -107,7 +111,7 @@ public class ServerConnectionFactoryTest {
return new ServerConnectionFactory().makeServerConnection(socketMock, mock(InternalCache.class),
mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "", communicationMode,
- mock(AcceptorImpl.class), mock(SecurityService.class), InetAddress.getLocalHost());
+ mock(AcceptorImpl.class), mock(SecurityService.class));
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index a4ebbac..bd23223 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -82,9 +82,9 @@ public class ServerConnectionTest {
InternalCache cache = mock(InternalCache.class);
SecurityService securityService = mock(SecurityService.class);
- serverConnection = new ServerConnectionFactory().makeServerConnection(socket, cache, null, null,
- 0, 0, null, CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor,
- securityService, InetAddress.getLocalHost());
+ serverConnection =
+ new ServerConnectionFactory().makeServerConnection(socket, cache, null, null, 0, 0, null,
+ CommunicationMode.PrimaryServerToClient.getModeNumber(), acceptor, securityService);
MockitoAnnotations.initMocks(this);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
similarity index 92%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
index b8969e1..1d86d70 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProtocolMessageHandler.java
@@ -32,10 +32,6 @@ import org.apache.geode.StatisticsFactory;
* {@link GenericProtocolServerConnection}.
*/
public interface ClientProtocolMessageHandler {
- void initializeStatistics(String statisticsName, StatisticsFactory factory);
-
- ClientProtocolStatistics getStatistics();
-
void receiveMessage(InputStream inputStream, OutputStream outputStream,
MessageExecutionContext executionContext) throws IOException;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java b/geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
similarity index 85%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
index fd1ed4d..cce750c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/cache/tier/sockets/MessageExecutionContext.java
@@ -20,6 +20,8 @@ import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
import org.apache.geode.security.server.Authorizer;
import org.apache.geode.security.server.NoOpAuthorizer;
@@ -27,23 +29,23 @@ import org.apache.geode.security.server.NoOpAuthorizer;
public class MessageExecutionContext {
private Cache cache;
private Locator locator;
- private Authorizer authorizer;
- private ClientProtocolStatistics statistics;
+ private final Authorizer authorizer;
+ private final ProtobufClientStatistics statistics;
public MessageExecutionContext(Cache cache, Authorizer streamAuthorizer,
- ClientProtocolStatistics statistics) {
+ ProtobufClientStatistics statistics) {
this.cache = cache;
this.authorizer = streamAuthorizer;
this.statistics = statistics;
}
- public MessageExecutionContext(InternalLocator locator) {
+ public MessageExecutionContext(InternalLocator locator, ProtobufClientStatistics statistics) {
this.locator = locator;
// set a no-op authorizer until such time as locators implement authentication
// and authorization checks
this.authorizer = new NoOpAuthorizer();
- this.statistics = new NoOpStatistics();
+ this.statistics = statistics;
}
/**
@@ -86,7 +88,7 @@ public class MessageExecutionContext {
* Returns the statistics for recording operation stats. In a unit test environment this may not
* be a protocol-specific statistics implementation.
*/
- public ClientProtocolStatistics getStatistics() {
+ public ProtobufClientStatistics getStatistics() {
return statistics;
}
}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufCachePipeline.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufCachePipeline.java
new file mode 100644
index 0000000..36bb477
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufCachePipeline.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.IncompatibleVersionException;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.ProtobufStreamProcessor;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.server.Authenticator;
+
+@Experimental
+public final class ProtobufCachePipeline implements ClientProtocolProcessor {
+ private final ProtobufClientStatistics statistics;
+ private final Cache cache;
+ private final SecurityService securityService;
+ private final ProtobufStreamProcessor streamProcessor;
+ private final Authenticator authenticator;
+
+ ProtobufCachePipeline(ProtobufStreamProcessor protobufStreamProcessor,
+ ProtobufClientStatistics statistics, Cache cache, Authenticator authenticator,
+ SecurityService securityService) {
+ this.streamProcessor = protobufStreamProcessor;
+ this.statistics = statistics;
+ this.cache = cache;
+ this.authenticator = authenticator;
+ this.securityService = securityService;
+ this.statistics.clientConnected();
+ }
+
+ @Override
+ public void processMessage(InputStream inputStream, OutputStream outputStream)
+ throws IOException, IncompatibleVersionException {
+ if (!authenticator.isAuthenticated()) {
+ authenticator.authenticate(inputStream, outputStream, securityService.getSecurityManager());
+ } else {
+ streamProcessor.receiveMessage(inputStream, outputStream,
+ new MessageExecutionContext(cache, authenticator.getAuthorizer(), statistics));
+ }
+ }
+
+ @Override
+ public void close() {
+ this.statistics.clientDisconnected();
+ }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufLocatorPipeline.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufLocatorPipeline.java
new file mode 100644
index 0000000..f4ed9e2
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufLocatorPipeline.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.protocol;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.cache.IncompatibleVersionException;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.sockets.MessageExecutionContext;
+import org.apache.geode.internal.protocol.protobuf.ProtobufStreamProcessor;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+
+@Experimental
+public final class ProtobufLocatorPipeline implements ClientProtocolProcessor {
+ private final ProtobufClientStatistics statistics;
+ private final InternalLocator locator;
+ private final ProtobufStreamProcessor streamProcessor;
+
+ ProtobufLocatorPipeline(ProtobufStreamProcessor protobufStreamProcessor,
+ ProtobufClientStatistics statistics, InternalLocator locator) {
+ this.streamProcessor = protobufStreamProcessor;
+ this.statistics = statistics;
+ this.locator = locator;
+ this.statistics.clientConnected();
+ }
+
+ @Override
+ public void processMessage(InputStream inputStream, OutputStream outputStream)
+ throws IOException, IncompatibleVersionException {
+ streamProcessor.receiveMessage(inputStream, outputStream,
+ new MessageExecutionContext(locator, statistics));
+ }
+
+ @Override
+ public void close() {
+ this.statistics.clientDisconnected();
+ }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufProtocolService.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufProtocolService.java
new file mode 100644
index 0000000..25df248
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/ProtobufProtocolService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol;
+
+import org.apache.geode.StatisticsFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolProcessor;
+import org.apache.geode.internal.cache.tier.sockets.ClientProtocolService;
+import org.apache.geode.internal.protocol.protobuf.ProtobufStreamProcessor;
+import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatisticsImpl;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.server.Authenticator;
+
+public class ProtobufProtocolService implements ClientProtocolService {
+ private volatile ProtobufClientStatistics statistics;
+ private final ProtobufStreamProcessor protobufStreamProcessor = new ProtobufStreamProcessor();
+
+ @Override
+ public synchronized void initializeStatistics(String statisticsName, StatisticsFactory factory) {
+ statistics = new ProtobufClientStatisticsImpl(factory, statisticsName,
+ ProtobufClientStatistics.PROTOBUF_STATS_NAME);
+ }
+
+ /**
+ * For internal use. This is necessary because the statistics may get initialized in another
+ * thread.
+ */
+ private ProtobufClientStatistics getStatistics() {
+ if (statistics == null) {
+ return new NoOpStatistics();
+ }
+ return statistics;
+ }
+
+ @Override
+ public ClientProtocolProcessor createProcessorForCache(Cache cache, Authenticator authenticator,
+ SecurityService securityService) {
+ return new ProtobufCachePipeline(protobufStreamProcessor, getStatistics(), cache, authenticator,
+ securityService);
+ }
+
+ @Override
+ public ClientProtocolProcessor createProcessorForLocator(InternalLocator locator) {
+ return new ProtobufLocatorPipeline(protobufStreamProcessor, getStatistics(), locator);
+ }
+}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java
index 5e06875..3571821 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/ProtobufStreamProcessor.java
@@ -43,7 +43,6 @@ import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
private final ProtobufProtocolSerializer protobufProtocolSerializer;
private final ProtobufOpsProcessor protobufOpsProcessor;
- private ProtobufClientStatistics statistics;
private static final Logger logger = LogService.getLogger();
public ProtobufStreamProcessor() {
@@ -53,16 +52,6 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
}
@Override
- public void initializeStatistics(String statisticsName, StatisticsFactory factory) {
- statistics = new ProtobufClientStatisticsImpl(factory, statisticsName, "ProtobufServerStats");
- }
-
- @Override
- public ClientProtocolStatistics getStatistics() {
- return statistics;
- }
-
- @Override
public void receiveMessage(InputStream inputStream, OutputStream outputStream,
MessageExecutionContext executionContext) throws IOException {
try {
@@ -81,6 +70,7 @@ public class ProtobufStreamProcessor implements ClientProtocolMessageHandler {
logger.debug(errorMessage);
throw new EOFException(errorMessage);
}
+ ProtobufClientStatistics statistics = executionContext.getStatistics();
statistics.messageReceived(message.getSerializedSize());
ClientProtocol.Request request = message.getRequest();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/NoOpStatistics.java
similarity index 70%
rename from geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
rename to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/NoOpStatistics.java
index d04db47..e06ea8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/NoOpStatistics.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/NoOpStatistics.java
@@ -12,9 +12,9 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-package org.apache.geode.internal.cache.tier.sockets;
+package org.apache.geode.internal.protocol.protobuf.statistics;
-public class NoOpStatistics implements ClientProtocolStatistics {
+public class NoOpStatistics implements ProtobufClientStatistics {
@Override
public void clientConnected() {
@@ -24,4 +24,24 @@ public class NoOpStatistics implements ClientProtocolStatistics {
public void clientDisconnected() {
}
+
+ @Override
+ public void messageReceived(int bytes) {
+
+ }
+
+ @Override
+ public void messageSent(int bytes) {
+
+ }
+
+ @Override
+ public void incAuthorizationViolations() {
+
+ }
+
+ @Override
+ public void incAuthenticationFailures() {
+
+ }
}
diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/ProtobufClientStatistics.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/ProtobufClientStatistics.java
index 95f2180..afd9648 100644
--- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/ProtobufClientStatistics.java
+++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/statistics/ProtobufClientStatistics.java
@@ -17,15 +17,17 @@ package org.apache.geode.internal.protocol.protobuf.statistics;
import org.apache.geode.internal.cache.tier.sockets.ClientProtocolStatistics;
public interface ProtobufClientStatistics extends ClientProtocolStatistics {
- public void clientConnected();
+ String PROTOBUF_STATS_NAME = "ProtobufStats";
- public void clientDisconnected();
+ void clientConnected();
- public void messageReceived(int bytes);
+ void clientDisconnected();
- public void messageSent(int bytes);
+ void messageReceived(int bytes);
- public void incAuthorizationViolations();
+ void messageSent(int bytes);
- public void incAuthenticationFailures();
+ void incAuthorizationViolations();
+
+ void incAuthenticationFailures();
}
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler
deleted file mode 100644
index a3a005e..0000000
--- a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolMessageHandler
+++ /dev/null
@@ -1 +0,0 @@
-org.apache.geode.internal.protocol.protobuf.ProtobufStreamProcessor
\ No newline at end of file
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolService b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolService
new file mode 100644
index 0000000..207426a
--- /dev/null
+++ b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.internal.cache.tier.sockets.ClientProtocolService
@@ -0,0 +1 @@
+org.apache.geode.internal.protocol.ProtobufProtocolService
\ No newline at end of file
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
index b35d923..98aa1b9 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
@@ -33,13 +33,12 @@ import org.junit.experimental.categories.Category;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
+import org.apache.geode.cache.IncompatibleVersionException;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.CommunicationMode;
import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.internal.protocol.protobuf.statistics.NoOpProtobufStatistics;
-import org.apache.geode.security.server.NoOpAuthenticator;
import org.apache.geode.test.junit.categories.UnitTest;
@Category(UnitTest.class)
@@ -48,7 +47,7 @@ public class GenericProtocolServerConnectionTest {
private ClientHealthMonitor clientHealthMonitorMock;
@Test
- public void testProcessFlag() throws IOException {
+ public void testProcessFlag() throws Exception {
ServerConnection serverConnection = IOExceptionThrowingServerConnection();
Assert.assertTrue(serverConnection.processMessages);
serverConnection.doOneMessage();
@@ -61,9 +60,9 @@ public class GenericProtocolServerConnectionTest {
when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
- ClientProtocolMessageHandler mockHandler = mock(ClientProtocolMessageHandler.class);
+ ClientProtocolProcessor clientProtocolProcessorMock = mock(ClientProtocolProcessor.class);
GenericProtocolServerConnection genericProtocolServerConnection =
- getServerConnection(socketMock, mockHandler, acceptorStub);
+ getServerConnection(socketMock, clientProtocolProcessorMock, acceptorStub);
genericProtocolServerConnection.emergencyClose();
@@ -74,9 +73,9 @@ public class GenericProtocolServerConnectionTest {
public void testClientHealthMonitorRegistration() throws UnknownHostException {
AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
- ClientProtocolMessageHandler clientProtocolMock = mock(ClientProtocolMessageHandler.class);
+ ClientProtocolProcessor clientProtocolProcessor = mock(ClientProtocolProcessor.class);
- ServerConnection serverConnection = getServerConnection(clientProtocolMock, acceptorStub);
+ ServerConnection serverConnection = getServerConnection(clientProtocolProcessor, acceptorStub);
ArgumentCaptor<ClientProxyMembershipID> registerCpmidArgumentCaptor =
ArgumentCaptor.forClass(ClientProxyMembershipID.class);
@@ -96,9 +95,9 @@ public class GenericProtocolServerConnectionTest {
@Test
public void testDoOneMessageNotifiesClientHealthMonitor() throws UnknownHostException {
AcceptorImpl acceptorStub = mock(AcceptorImpl.class);
- ClientProtocolMessageHandler clientProtocolMock = mock(ClientProtocolMessageHandler.class);
+ ClientProtocolProcessor clientProtocolProcessor = mock(ClientProtocolProcessor.class);
- ServerConnection serverConnection = getServerConnection(clientProtocolMock, acceptorStub);
+ ServerConnection serverConnection = getServerConnection(clientProtocolProcessor, acceptorStub);
serverConnection.doOneMessage();
ArgumentCaptor<ClientProxyMembershipID> clientProxyMembershipIDArgumentCaptor =
@@ -108,17 +107,15 @@ public class GenericProtocolServerConnectionTest {
clientProxyMembershipIDArgumentCaptor.getValue().toString());
}
- private GenericProtocolServerConnection IOExceptionThrowingServerConnection() throws IOException {
- ClientProtocolMessageHandler clientProtocolMock = mock(ClientProtocolMessageHandler.class);
- ClientProtocolStatistics statisticsMock = mock(ClientProtocolStatistics.class);
- when(clientProtocolMock.getStatistics()).thenReturn(statisticsMock);
- doThrow(new IOException()).when(clientProtocolMock).receiveMessage(any(), any(), any());
-
- return getServerConnection(clientProtocolMock, mock(AcceptorImpl.class));
+ private GenericProtocolServerConnection IOExceptionThrowingServerConnection()
+ throws IOException, IncompatibleVersionException {
+ ClientProtocolProcessor clientProtocolProcessor = mock(ClientProtocolProcessor.class);
+ doThrow(new IOException()).when(clientProtocolProcessor).processMessage(any(), any());
+ return getServerConnection(clientProtocolProcessor, mock(AcceptorImpl.class));
}
private GenericProtocolServerConnection getServerConnection(Socket socketMock,
- ClientProtocolMessageHandler clientProtocolMock, AcceptorImpl acceptorStub)
+ ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl acceptorStub)
throws UnknownHostException {
clientHealthMonitorMock = mock(ClientHealthMonitor.class);
when(acceptorStub.getClientHealthMonitor()).thenReturn(clientHealthMonitorMock);
@@ -128,18 +125,16 @@ public class GenericProtocolServerConnectionTest {
when(socketMock.getRemoteSocketAddress()).thenReturn(inetSocketAddressStub);
when(socketMock.getInetAddress()).thenReturn(inetAddressStub);
- when(clientProtocolMock.getStatistics()).thenReturn(new NoOpProtobufStatistics());
-
return new GenericProtocolServerConnection(socketMock, mock(InternalCache.class),
mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
CommunicationMode.ProtobufClientServerProtocol.getModeNumber(), acceptorStub,
- clientProtocolMock, mock(SecurityService.class), new NoOpAuthenticator());
+ clientProtocolProcessorMock, mock(SecurityService.class));
}
private GenericProtocolServerConnection getServerConnection(
- ClientProtocolMessageHandler clientProtocolMock, AcceptorImpl acceptorStub)
+ ClientProtocolProcessor clientProtocolProcessorMock, AcceptorImpl acceptorStub)
throws UnknownHostException {
Socket socketMock = mock(Socket.class);
- return getServerConnection(socketMock, clientProtocolMock, acceptorStub);
+ return getServerConnection(socketMock, clientProtocolProcessorMock, acceptorStub);
}
}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
index d4f90a5..99b0e1f 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/CacheConnectionJUnitTest.java
@@ -66,6 +66,7 @@ import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.ProtobufSerializationService;
import org.apache.geode.internal.protocol.protobuf.RegionAPI;
import org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.internal.serialization.SerializationService;
import org.apache.geode.internal.serialization.exception.UnsupportedEncodingTypeException;
@@ -172,10 +173,10 @@ public class CacheConnectionJUnitTest {
InternalDistributedSystem distributedSystem =
(InternalDistributedSystem) cache.getDistributedSystem();
- Statistics[] protobufServerStats =
- distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
- assertEquals(1, protobufServerStats.length);
- Statistics statistics = protobufServerStats[0];
+ Statistics[] protobufStats = distributedSystem.findStatisticsByType(
+ distributedSystem.findType(ProtobufClientStatistics.PROTOBUF_STATS_NAME));
+ assertEquals(1, protobufStats.length);
+ Statistics statistics = protobufStats[0];
assertEquals(1, statistics.get("currentClientConnections"));
assertEquals(2L, statistics.get("messagesReceived"));
assertEquals(2L, statistics.get("messagesSent"));
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
index 1a3f654..1f4579c 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/acceptance/LocatorConnectionDUnitTest.java
@@ -18,7 +18,6 @@ package org.apache.geode.internal.protocol.acceptance;
import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -36,17 +35,17 @@ import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
-import org.apache.geode.internal.protocol.protobuf.ServerAPI;
import org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
+import org.apache.geode.internal.protocol.protobuf.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.ProtocolErrorCode;
+import org.apache.geode.internal.protocol.protobuf.ServerAPI;
import org.apache.geode.internal.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufUtilities;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.RMIException;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.junit.categories.DistributedTest;
@@ -63,9 +62,9 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
@Before
public void setup() throws IOException {
- startCacheWithCacheServer();
-
Host.getLocator().invoke(() -> System.setProperty("geode.feature-protobuf-protocol", "true"));
+
+ startCacheWithCacheServer();
}
private Socket createSocket() throws IOException {
@@ -89,52 +88,35 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
protobufRequestBuilder.setGetAvailableServersRequest(
ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
- try {
- ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+ ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();
+
+ testSocketWithStats(getAvailableServersRequestMessage, protobufProtocolSerializer);
+
+ testSocketWithStats(getAvailableServersRequestMessage, protobufProtocolSerializer);
+ }
+
+ private void testSocketWithStats(ClientProtocol.Message getAvailableServersRequestMessage,
+ ProtobufProtocolSerializer protobufProtocolSerializer)
+ throws IOException, InvalidProtocolMessageException {
+ try (Socket socket = createSocket()) {
+ long messagesReceived = getMessagesReceived();
+ long messagesSent = getMessagesSent();
+ long bytesReceived = getBytesReceived();
+ long bytesSent = getBytesSent();
+ int clientConnectionStarts = getClientConnectionStarts();
+ int clientConnectionTerminations = getClientConnectionTerminations();
- try (Socket socket = createSocket()) {
- long messagesReceived = getMessagesReceived();
- long messagesSent = getMessagesSent();
- long bytesReceived = getBytesReceived();
- long bytesSent = getBytesSent();
- int clientConnectionStarts = getClientConnectionStarts();
- int clientConnectionTerminations = getClientConnectionTerminations();
-
- protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
- socket.getOutputStream());
-
- ClientProtocol.Message getAvailableServersResponseMessage =
- protobufProtocolSerializer.deserialize(socket.getInputStream());
- validateGetAvailableServersResponse(getAvailableServersResponseMessage);
-
- validateStats(messagesReceived + 1, messagesSent + 1,
- bytesReceived + getAvailableServersRequestMessage.getSerializedSize(),
- bytesSent + getAvailableServersResponseMessage.getSerializedSize(),
- clientConnectionStarts, clientConnectionTerminations + 1);
- }
-
- try (Socket socket = createSocket()) {
- long messagesReceived = getMessagesReceived();
- long messagesSent = getMessagesSent();
- long bytesReceived = getBytesReceived();
- long bytesSent = getBytesSent();
- int clientConnectionStarts = getClientConnectionStarts();
- int clientConnectionTerminations = getClientConnectionTerminations();
-
- protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
- socket.getOutputStream());
-
- ClientProtocol.Message getAvailableServersResponseMessage =
- protobufProtocolSerializer.deserialize(socket.getInputStream());
- validateGetAvailableServersResponse(getAvailableServersResponseMessage);
-
- validateStats(messagesReceived + 1, messagesSent + 1,
- bytesReceived + getAvailableServersRequestMessage.getSerializedSize(),
- bytesSent + getAvailableServersResponseMessage.getSerializedSize(),
- clientConnectionStarts, clientConnectionTerminations + 1);
- }
- } catch (RMIException e) {
- throw e.getCause(); // so that assertions propagate properly.
+ protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
+ socket.getOutputStream());
+
+ ClientProtocol.Message getAvailableServersResponseMessage =
+ protobufProtocolSerializer.deserialize(socket.getInputStream());
+ validateGetAvailableServersResponse(getAvailableServersResponseMessage);
+
+ validateStats(messagesReceived + 1, messagesSent + 1,
+ bytesReceived + getAvailableServersRequestMessage.getSerializedSize(),
+ bytesSent + getAvailableServersResponseMessage.getSerializedSize(),
+ clientConnectionStarts, clientConnectionTerminations + 1);
}
}
@@ -187,8 +169,8 @@ public class LocatorConnectionDUnitTest extends JUnit4CacheTestCase {
InternalDistributedSystem distributedSystem =
(InternalDistributedSystem) Locator.getLocator().getDistributedSystem();
- Statistics[] protobufServerStats =
- distributedSystem.findStatisticsByType(distributedSystem.findType("ProtobufServerStats"));
+ Statistics[] protobufServerStats = distributedSystem.findStatisticsByType(
+ distributedSystem.findType(ProtobufClientStatistics.PROTOBUF_STATS_NAME));
assertEquals(1, protobufServerStats.length);
return protobufServerStats[0];
}
diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
index 9b96dab..5f724d6 100644
--- a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
+++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/operations/GetAvailableServersOperationHandlerJUnitTest.java
@@ -25,6 +25,7 @@ import org.apache.geode.internal.protocol.protobuf.Result;
import org.apache.geode.internal.protocol.protobuf.ServerAPI;
import org.apache.geode.internal.protocol.protobuf.ServerAPI.GetAvailableServersResponse;
import org.apache.geode.internal.protocol.protobuf.Success;
+import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
import org.apache.geode.internal.protocol.protobuf.utilities.ProtobufRequestUtilities;
import org.apache.geode.test.junit.categories.UnitTest;
import org.junit.Before;
@@ -74,8 +75,9 @@ public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandl
ServerAPI.GetAvailableServersRequest getAvailableServersRequest =
ProtobufRequestUtilities.createGetAvailableServersRequest();
- Result operationHandlerResult = operationHandler.process(serializationServiceStub,
- getAvailableServersRequest, new MessageExecutionContext(internalLocatorMock));
+ Result operationHandlerResult =
+ operationHandler.process(serializationServiceStub, getAvailableServersRequest,
+ new MessageExecutionContext(internalLocatorMock, new NoOpStatistics()));
assertTrue(operationHandlerResult instanceof Success);
ValidateGetAvailableServersResponse(
(GetAvailableServersResponse) operationHandlerResult.getMessage());
@@ -88,8 +90,9 @@ public class GetAvailableServersOperationHandlerJUnitTest extends OperationHandl
ServerAPI.GetAvailableServersRequest getAvailableServersRequest =
ProtobufRequestUtilities.createGetAvailableServersRequest();
- Result operationHandlerResult = operationHandler.process(serializationServiceStub,
- getAvailableServersRequest, new MessageExecutionContext(internalLocatorMock));
+ Result operationHandlerResult =
+ operationHandler.process(serializationServiceStub, getAvailableServersRequest,
+ new MessageExecutionContext(internalLocatorMock, new NoOpStatistics()));
assertTrue(operationHandlerResult instanceof Success);
GetAvailableServersResponse availableServersResponse =
(GetAvailableServersResponse) operationHandlerResult.getMessage();
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].