You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/08/15 17:49:50 UTC

geode git commit: GEODE-3412: Add simple authentication flow to protobuf protocol. This now closes #707

Repository: geode
Updated Branches:
  refs/heads/develop 190cfed88 -> a7a197d63


GEODE-3412: Add simple authentication flow to protobuf protocol. This now closes #707

This change adds a simple username/password validation to the protobuf protocol.
It also adds a new configuration parameter to specify the type of authentication required.

Signed-off-by: Galen O'Sullivan <go...@pivotal.io>


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a7a197d6
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a7a197d6
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a7a197d6

Branch: refs/heads/develop
Commit: a7a197d633a20ee3a2161d47389581858745c1cc
Parents: 190cfed
Author: Brian Rowe <br...@pivotal.io>
Authored: Thu Aug 10 11:16:25 2017 -0700
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Tue Aug 15 10:49:33 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/CacheServerImpl.java   |  10 +-
 .../cache/tier/sockets/AcceptorImpl.java        |  39 ++---
 .../GenericProtocolServerConnection.java        |  13 +-
 .../tier/sockets/ServerConnectionFactory.java   |  86 +++++++----
 .../geode/security/NoOpStreamAuthenticator.java |  45 ++++++
 .../geode/security/StreamAuthenticator.java     |  52 +++++++
 ...rg.apache.geode.security.StreamAuthenticator |   1 +
 .../tier/sockets/AcceptorImplJUnitTest.java     |  25 ++--
 .../GenericProtocolServerConnectionTest.java    |   2 +-
 .../sockets/ServerConnectionFactoryTest.java    |  53 ++++---
 .../tier/sockets/ServerConnectionTest.java      |   4 +-
 .../protobuf/ProtobufSimpleAuthenticator.java   |  63 ++++++++
 .../src/main/proto/authentication_API.proto     |  26 ++++
 .../src/main/proto/clientProtocol.proto         |   1 -
 ...rg.apache.geode.security.StreamAuthenticator |   1 +
 .../protocol/AuthenticationIntegrationTest.java | 142 +++++++++++++++++++
 .../ProtobufSimpleAuthenticatorJUnitTest.java   | 111 +++++++++++++++
 17 files changed, 584 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
index 7d4b6d4..bcd8b32 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.geode.internal.cache.tier.sockets.ServerConnectionFactory;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
@@ -91,6 +92,13 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
 
   private final SecurityService securityService;
 
+  /**
+   * The server connection factory, that provides either a
+   * {@link org.apache.geode.internal.cache.tier.sockets.LegacyServerConnection} or a new
+   * {@link org.apache.geode.internal.cache.tier.sockets.GenericProtocolServerConnection}
+   */
+  private final ServerConnectionFactory serverConnectionFactory = new ServerConnectionFactory();
+
   /** The acceptor that does the actual serving */
   private volatile AcceptorImpl acceptor;
 
@@ -343,7 +351,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution
         getSocketBufferSize(), getMaximumTimeBetweenPings(), this.cache, getMaxConnections(),
         getMaxThreads(), getMaximumMessageCount(), getMessageTimeToLive(), this.loadMonitor,
         overflowAttributesList, this.isGatewayReceiver, this.gatewayTransportFilters,
-        this.tcpNoDelay);
+        this.tcpNoDelay, serverConnectionFactory);
 
     this.acceptor.start();
     this.advisor.handshake();

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index d18fa6a..2e33af8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -303,6 +303,8 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
 
   private final SecurityService securityService;
 
+  private final ServerConnectionFactory serverConnectionFactory;
+
   /**
    * Initializes this acceptor thread to listen for connections on the given port.
    *
@@ -324,13 +326,15 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
       int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache,
       int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive,
       ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver,
-      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay) throws IOException {
+      List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay,
+      ServerConnectionFactory serverConnectionFactory) throws IOException {
     this.securityService = internalCache.getSecurityService();
     this.bindHostName = calcBindHostName(internalCache, bindHostName);
     this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener;
     this.notifyBySubscription = notifyBySubscription;
     this.isGatewayReceiver = isGatewayReceiver;
     this.gatewayTransportFilters = transportFilter;
+    this.serverConnectionFactory = serverConnectionFactory;
     {
       int tmp_maxConnections = maxConnections;
       if (tmp_maxConnections < MINIMUM_MAX_CONNECTIONS) {
@@ -1243,13 +1247,13 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
 
       crHelper.checkCancelInProgress(null); // throws
 
-      Socket s = null;
+      Socket socket = null;
       try {
-        s = serverSock.accept();
+        socket = serverSock.accept();
         crHelper.checkCancelInProgress(null); // throws
 
         // Optionally enable SO_KEEPALIVE in the OS network protocol.
-        s.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
+        socket.setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
         // The synchronization below was added to prevent close from being
         // called
@@ -1265,22 +1269,22 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
 
         synchronized (this.syncLock) {
           if (!isRunning()) {
-            closeSocket(s);
+            closeSocket(socket);
             break;
           }
         }
         this.loggedAcceptError = false;
 
-        handOffNewClientConnection(s);
+        handOffNewClientConnection(socket, serverConnectionFactory);
       } catch (InterruptedIOException e) { // Solaris only
-        closeSocket(s);
+        closeSocket(socket);
         if (isRunning()) {
           if (logger.isDebugEnabled()) {
             logger.debug("Aborted due to interrupt: {}", e);
           }
         }
       } catch (IOException e) {
-        closeSocket(s);
+        closeSocket(socket);
         if (isRunning()) {
           if (!this.loggedAcceptError) {
             this.loggedAcceptError = true;
@@ -1291,10 +1295,10 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
           // try {Thread.sleep(3000);} catch (InterruptedException ie) {}
         }
       } catch (CancelException e) {
-        closeSocket(s);
+        closeSocket(socket);
         throw e;
       } catch (Exception e) {
-        closeSocket(s);
+        closeSocket(socket);
         if (isRunning()) {
           logger.fatal(LocalizedMessage
               .create(LocalizedStrings.AcceptorImpl_CACHE_SERVER_UNEXPECTED_EXCEPTION, e));
@@ -1303,20 +1307,20 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
     }
   }
 
-
   /**
    * Hand off a new client connection to the thread pool that processes handshakes. If all the
    * threads in this pool are busy then the hand off will block until a thread is available. This
    * blocking is good because it will throttle the rate at which we create new connections.
    */
-  private void handOffNewClientConnection(final Socket s) {
+  private void handOffNewClientConnection(final Socket socket,
+      final ServerConnectionFactory serverConnectionFactory) {
     try {
       this.stats.incAcceptsInProgress();
       this.hsPool.execute(new Runnable() {
         public void run() {
           boolean finished = false;
           try {
-            handleNewClientConnection(s);
+            handleNewClientConnection(socket, serverConnectionFactory);
             finished = true;
           } catch (RegionDestroyedException rde) {
             // aborted due to disconnect - bug 42273
@@ -1343,7 +1347,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
             }
           } finally {
             if (!finished) {
-              closeSocket(s);
+              closeSocket(socket);
             }
             if (isRunning()) {
               AcceptorImpl.this.stats.decAcceptsInProgress();
@@ -1352,7 +1356,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
         }
       });
     } catch (RejectedExecutionException rejected) {
-      closeSocket(s);
+      closeSocket(socket);
       if (isRunning()) {
         this.stats.decAcceptsInProgress();
         logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected));
@@ -1389,7 +1393,8 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
     return this.clientServerCnxCount.get();
   }
 
-  protected void handleNewClientConnection(final Socket socket) throws IOException {
+  protected void handleNewClientConnection(final Socket socket,
+      final ServerConnectionFactory serverConnectionFactory) throws IOException {
     // Read the first byte. If this socket is being used for 'client to server'
     // communication, create a ServerConnection. If this socket is being used
     // for 'server to client' communication, send it to the CacheClientNotifier
@@ -1468,7 +1473,7 @@ public class AcceptorImpl extends Acceptor implements Runnable, CommBufferPool {
       }
     }
 
-    ServerConnection serverConn = ServerConnectionFactory.makeServerConnection(socket, this.cache,
+    ServerConnection serverConn = serverConnectionFactory.makeServerConnection(socket, this.cache,
         this.crHelper, this.stats, AcceptorImpl.handShakeTimeout, this.socketBufferSize,
         communicationModeStr, communicationMode, this, this.securityService);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
index 76b3b7e..7c8fb5c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnection.java
@@ -19,6 +19,7 @@ import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.security.SecurityManager;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,6 +32,8 @@ import java.net.Socket;
 public class GenericProtocolServerConnection extends ServerConnection {
   // The new protocol lives in a separate module and gets loaded when this class is instantiated.
   private final ClientProtocolMessageHandler messageHandler;
+  private final SecurityManager securityManager;
+  private final StreamAuthenticator authenticator;
 
   /**
    * Creates a new <code>GenericProtocolServerConnection</code> that processes messages received
@@ -39,10 +42,12 @@ public class GenericProtocolServerConnection extends ServerConnection {
   public GenericProtocolServerConnection(Socket s, InternalCache c, CachedRegionHelper helper,
       CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
       byte communicationMode, Acceptor acceptor, ClientProtocolMessageHandler newClientProtocol,
-      SecurityService securityService) {
+      SecurityService securityService, StreamAuthenticator authenticator) {
     super(s, c, helper, stats, hsTimeout, socketBufferSize, communicationModeStr, communicationMode,
         acceptor, securityService);
+    securityManager = securityService.getSecurityManager();
     this.messageHandler = newClientProtocol;
+    this.authenticator = authenticator;
   }
 
   @Override
@@ -52,7 +57,11 @@ public class GenericProtocolServerConnection extends ServerConnection {
       InputStream inputStream = socket.getInputStream();
       OutputStream outputStream = socket.getOutputStream();
 
-      messageHandler.receiveMessage(inputStream, outputStream, this.getCache());
+      if (!authenticator.isAuthenticated()) {
+        authenticator.receiveMessage(inputStream, outputStream, securityManager);
+      } else {
+        messageHandler.receiveMessage(inputStream, outputStream, this.getCache());
+      }
     } catch (IOException e) {
       logger.warn(e);
       this.setFlagProcessMessagesAsFalse(); // TODO: better shutdown.

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
index ad13b78..1d53297 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactory.java
@@ -22,59 +22,89 @@ import org.apache.geode.internal.security.SecurityService;
 
 import java.io.IOException;
 import java.net.Socket;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.ServiceLoader;
-import javax.management.ServiceNotFoundException;
 
 /**
  * Creates instances of ServerConnection based on the connection mode provided.
  */
 public class ServerConnectionFactory {
-  private static ClientProtocolMessageHandler protobufProtocolHandler;
-  private static final Object protocolLoadLock = new Object();
+  private ClientProtocolMessageHandler protobufProtocolHandler;
+  private Map<String, Class<? extends StreamAuthenticator>> authenticators = null;
 
-  private static ClientProtocolMessageHandler findClientProtocolMessageHandler() {
+  public ServerConnectionFactory() {}
+
+  private synchronized void initializeAuthenticatorsMap() {
+    if (authenticators != null) {
+      return;
+    }
+    authenticators = new HashMap<>();
+    ServiceLoader<StreamAuthenticator> loader = ServiceLoader.load(StreamAuthenticator.class);
+    for (StreamAuthenticator streamAuthenticator : loader) {
+      authenticators.put(streamAuthenticator.implementationID(), streamAuthenticator.getClass());
+    }
+  }
+
+  private synchronized ClientProtocolMessageHandler initializeMessageHandler() {
     if (protobufProtocolHandler != null) {
       return protobufProtocolHandler;
     }
+    ServiceLoader<ClientProtocolMessageHandler> loader =
+        ServiceLoader.load(ClientProtocolMessageHandler.class);
+    Iterator<ClientProtocolMessageHandler> iterator = loader.iterator();
 
-    synchronized (protocolLoadLock) {
-      if (protobufProtocolHandler != null) {
-        return protobufProtocolHandler;
-      }
-
-      ServiceLoader<ClientProtocolMessageHandler> loader =
-          ServiceLoader.load(ClientProtocolMessageHandler.class);
-      Iterator<ClientProtocolMessageHandler> iterator = loader.iterator();
-
-      if (!iterator.hasNext()) {
-        throw new ServiceLoadingFailureException(
-            "ClientProtocolMessageHandler implementation not found in JVM");
-      }
+    if (!iterator.hasNext()) {
+      throw new ServiceLoadingFailureException(
+          "There is no ClientProtocolMessageHandler implementation found in JVM");
+    }
 
-      ClientProtocolMessageHandler returnValue = iterator.next();
+    protobufProtocolHandler = iterator.next();
+    return protobufProtocolHandler;
+  }
 
-      if (iterator.hasNext()) {
+  private StreamAuthenticator findStreamAuthenticator(String implementationID) {
+    if (authenticators == null) {
+      initializeAuthenticatorsMap();
+    }
+    Class<? extends StreamAuthenticator> streamAuthenticatorClass =
+        authenticators.get(implementationID);
+    if (streamAuthenticatorClass == null) {
+      throw new ServiceLoadingFailureException(
+          "Could not find implementation for StreamAuthenticator with implementation ID "
+              + implementationID);
+    } else {
+      try {
+        return streamAuthenticatorClass.newInstance();
+      } catch (InstantiationException | IllegalAccessException e) {
         throw new ServiceLoadingFailureException(
-            "Multiple service implementations found for ClientProtocolMessageHandler");
+            "Unable to instantiate authenticator for ID " + implementationID, e);
       }
+    }
+  }
 
-      return returnValue;
+  private ClientProtocolMessageHandler getClientProtocolMessageHandler() {
+    if (protobufProtocolHandler == null) {
+      initializeMessageHandler();
     }
+    return protobufProtocolHandler;
   }
 
-  public static ServerConnection makeServerConnection(Socket s, InternalCache c,
-      CachedRegionHelper helper, CacheServerStats stats, int hsTimeout, int socketBufferSize,
-      String communicationModeStr, byte communicationMode, Acceptor acceptor,
-      SecurityService securityService) throws IOException {
+  public ServerConnection makeServerConnection(Socket s, InternalCache c, CachedRegionHelper helper,
+      CacheServerStats stats, int hsTimeout, int socketBufferSize, String communicationModeStr,
+      byte communicationMode, Acceptor acceptor, SecurityService securityService)
+      throws IOException {
     if (communicationMode == Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL) {
       if (!Boolean.getBoolean("geode.feature-protobuf-protocol")) {
         throw new IOException("Acceptor received unknown communication mode: " + communicationMode);
       } else {
-        protobufProtocolHandler = findClientProtocolMessageHandler();
+        String authenticationMode =
+            System.getProperty("geode.protocol-authentication-mode", "NOOP");
+
         return new GenericProtocolServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize,
-            communicationModeStr, communicationMode, acceptor, protobufProtocolHandler,
-            securityService);
+            communicationModeStr, communicationMode, acceptor, getClientProtocolMessageHandler(),
+            securityService, findStreamAuthenticator(authenticationMode));
       }
     } else {
       return new LegacyServerConnection(s, c, helper, stats, hsTimeout, socketBufferSize,

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/main/java/org/apache/geode/security/NoOpStreamAuthenticator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/security/NoOpStreamAuthenticator.java b/geode-core/src/main/java/org/apache/geode/security/NoOpStreamAuthenticator.java
new file mode 100644
index 0000000..bca1ec2
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/security/NoOpStreamAuthenticator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.security.SecurityManager;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * An implementation of {@link StreamAuthenticator} that doesn't use its parameters and always
+ * returns true.
+ */
+public class NoOpStreamAuthenticator implements StreamAuthenticator {
+
+
+  @Override
+  public void receiveMessage(InputStream inputStream, OutputStream outputStream,
+      SecurityManager securityManager) throws IOException {
+    // this method needs to do nothing as it is a pass-through implementation
+  }
+
+  @Override
+  public boolean isAuthenticated() {
+    return true;
+  }
+
+  @Override
+  public String implementationID() {
+    return "NOOP";
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/main/java/org/apache/geode/security/StreamAuthenticator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/security/StreamAuthenticator.java b/geode-core/src/main/java/org/apache/geode/security/StreamAuthenticator.java
new file mode 100644
index 0000000..51cbf2e
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/security/StreamAuthenticator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.security.SecurityManager;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Implementers of this interface do some message passing over a socket to authenticate a client,
+ * then hand off the connection to the protocol that will talk on the socket.
+ *
+ * If authentication fails, an implementor may continue to wait for another valid authentication
+ * exchange.
+ */
+public interface StreamAuthenticator {
+  /**
+   *
+   * @param inputStream to read auth messages from.
+   * @param outputStream to send messages to.
+   * @param securityManager can be used for validating credentials against.
+   * @throws IOException if EOF or if invalid input is received.
+   */
+  void receiveMessage(InputStream inputStream, OutputStream outputStream,
+      SecurityManager securityManager) throws IOException;
+
+  /**
+   * Until authentication is complete, isAuthenticated() must return false, and the socket will
+   * always be passed to the StreamAuthenticator. Once authentication succeeds, calls to this
+   * function must always return true.
+   */
+  boolean isAuthenticated();
+
+  /**
+   * @return a unique identifier for this particular implementation (NOOP, PASSTHROUGH, etc.)
+   */
+  String implementationID();
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/main/resources/META-INF/services/org.apache.geode.security.StreamAuthenticator
----------------------------------------------------------------------
diff --git a/geode-core/src/main/resources/META-INF/services/org.apache.geode.security.StreamAuthenticator b/geode-core/src/main/resources/META-INF/services/org.apache.geode.security.StreamAuthenticator
new file mode 100644
index 0000000..3b93815
--- /dev/null
+++ b/geode-core/src/main/resources/META-INF/services/org.apache.geode.security.StreamAuthenticator
@@ -0,0 +1 @@
+org.apache.geode.security.NoOpStreamAuthenticator
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
index 1fe5980..6c46eff 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
@@ -14,11 +14,6 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.server.CacheServer;
@@ -40,6 +35,11 @@ import java.net.BindException;
 import java.util.Collections;
 import java.util.Properties;
 
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 @Category({IntegrationTest.class, ClientServerTest.class})
 public class AcceptorImplJUnitTest {
 
@@ -74,12 +74,14 @@ public class AcceptorImplJUnitTest {
       int port2 = freeTCPPorts[1];
 
 
+      ServerConnectionFactory serverConnectionFactory = new ServerConnectionFactory();
       try {
         new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            serverConnectionFactory);
         fail("Expected an IllegalArgumentExcption due to max conns < min pool size");
       } catch (IllegalArgumentException expected) {
       }
@@ -89,7 +91,7 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0,
             CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
             CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST,
-            CacheServer.DEFAULT_TCP_NO_DELAY);
+            CacheServer.DEFAULT_TCP_NO_DELAY, serverConnectionFactory);
         fail("Expected an IllegalArgumentExcption due to max conns of zero");
       } catch (IllegalArgumentException expected) {
       }
@@ -99,12 +101,14 @@ public class AcceptorImplJUnitTest {
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            serverConnectionFactory);
         a2 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
             CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
-            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+            null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+            serverConnectionFactory);
         fail("Expecetd a BindException while attaching to the same port");
       } catch (BindException expected) {
       }
@@ -113,7 +117,8 @@ public class AcceptorImplJUnitTest {
           CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
-          null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+          null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+          serverConnectionFactory);
       assertEquals(port2, a3.getPort());
       InternalDistributedSystem isystem =
           (InternalDistributedSystem) this.cache.getDistributedSystem();

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
index 3bfcd8b..3dcf343 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/GenericProtocolServerConnectionTest.java
@@ -57,6 +57,6 @@ public class GenericProtocolServerConnectionTest {
     return new GenericProtocolServerConnection(socketMock, mock(InternalCache.class),
         mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "",
         Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL, mock(AcceptorImpl.class), clientProtocolMock,
-        mock(SecurityService.class));
+        mock(SecurityService.class), new NoOpStreamAuthenticator());
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
index b3c3e32..cffa05f 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionFactoryTest.java
@@ -15,13 +15,14 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.Acceptor;
 import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
 
 import java.io.IOException;
@@ -36,18 +37,22 @@ import static org.mockito.Mockito.when;
  * We don't test the path where the service providing protobufProtocolHandler is actually present,
  * because it lives outside this module, and all the integration tests from that module will test
  * the newclient protocol happy path.
- *
+ * <p>
  * What we are concerned with is making sure that everything stays the same when the feature flag
  * isn't set, and that we at least try to load the service when the feature flag is true.
  */
 @Category(UnitTest.class)
 public class ServerConnectionFactoryTest {
+
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
   /**
    * Safeguard that we won't create the new client protocol object unless the feature flag is
    * enabled.
    */
   @Test(expected = IOException.class)
-  public void newClientProtocolFailsWithoutSystemPropertySet() throws Exception {
+  public void newClientProtocolFailsWithoutSystemPropertySet() throws IOException {
     ServerConnection serverConnection =
         serverConnectionMockedExceptForCommunicationMode(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL);
 
@@ -58,14 +63,10 @@ public class ServerConnectionFactoryTest {
    *         module, and when this unit test is run, that module won't be present.
    */
   @Test(expected = ServiceLoadingFailureException.class)
-  public void newClientProtocolFailsWithSystemPropertySet() throws Exception {
-    try {
-      System.setProperty("geode.feature-protobuf-protocol", "true");
-      ServerConnection serverConnection = serverConnectionMockedExceptForCommunicationMode(
-          Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL);
-    } finally {
-      System.clearProperty("geode.feature-protobuf-protocol");
-    }
+  public void newClientProtocolFailsWithSystemPropertySet() throws IOException {
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+    ServerConnection serverConnection =
+        serverConnectionMockedExceptForCommunicationMode(Acceptor.PROTOBUF_CLIENT_SERVER_PROTOCOL);
   }
 
   @Test
@@ -86,29 +87,25 @@ public class ServerConnectionFactoryTest {
   @Test
   public void makeServerConnectionForOldProtocolWithFeatureFlagEnabled() throws IOException {
     System.setProperty("geode.feature-protobuf-protocol", "true");
-    try {
-      byte[] communicationModes =
-          new byte[] {Acceptor.CLIENT_TO_SERVER, Acceptor.PRIMARY_SERVER_TO_CLIENT,
-              Acceptor.SECONDARY_SERVER_TO_CLIENT, Acceptor.GATEWAY_TO_GATEWAY,
-              Acceptor.MONITOR_TO_SERVER, Acceptor.SUCCESSFUL_SERVER_TO_CLIENT,
-              Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT, Acceptor.CLIENT_TO_SERVER_FOR_QUEUE,};
-
-      for (byte communicationMode : communicationModes) {
-        ServerConnection serverConnection =
-            serverConnectionMockedExceptForCommunicationMode(communicationMode);
-        assertTrue(serverConnection instanceof LegacyServerConnection);
-      }
-    } finally {
-      System.clearProperty("geode.feature-protobuf-protocol");
+    byte[] communicationModes =
+        new byte[] {Acceptor.CLIENT_TO_SERVER, Acceptor.PRIMARY_SERVER_TO_CLIENT,
+            Acceptor.SECONDARY_SERVER_TO_CLIENT, Acceptor.GATEWAY_TO_GATEWAY,
+            Acceptor.MONITOR_TO_SERVER, Acceptor.SUCCESSFUL_SERVER_TO_CLIENT,
+            Acceptor.UNSUCCESSFUL_SERVER_TO_CLIENT, Acceptor.CLIENT_TO_SERVER_FOR_QUEUE,};
+
+    for (byte communicationMode : communicationModes) {
+      ServerConnection serverConnection =
+          serverConnectionMockedExceptForCommunicationMode(communicationMode);
+      assertTrue(serverConnection instanceof LegacyServerConnection);
     }
   }
 
-  private static ServerConnection serverConnectionMockedExceptForCommunicationMode(
-      byte communicationMode) throws IOException {
+  private ServerConnection serverConnectionMockedExceptForCommunicationMode(byte communicationMode)
+      throws IOException {
     Socket socketMock = mock(Socket.class);
     when(socketMock.getInetAddress()).thenReturn(InetAddress.getByName("localhost"));
 
-    return ServerConnectionFactory.makeServerConnection(socketMock, mock(InternalCache.class),
+    return new ServerConnectionFactory().makeServerConnection(socketMock, mock(InternalCache.class),
         mock(CachedRegionHelper.class), mock(CacheServerStats.class), 0, 0, "", communicationMode,
         mock(AcceptorImpl.class), mock(SecurityService.class));
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
index 7399a72..2aa8995 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ServerConnectionTest.java
@@ -83,8 +83,8 @@ public class ServerConnectionTest {
     InternalCache cache = mock(InternalCache.class);
     SecurityService securityService = mock(SecurityService.class);
 
-    serverConnection = ServerConnectionFactory.makeServerConnection(socket, cache, null, null, 0, 0,
-        null, Acceptor.PRIMARY_SERVER_TO_CLIENT, acceptor, securityService);
+    serverConnection = new ServerConnectionFactory().makeServerConnection(socket, cache, null, null,
+        0, 0, null, Acceptor.PRIMARY_SERVER_TO_CLIENT, acceptor, securityService);
     MockitoAnnotations.initMocks(this);
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufSimpleAuthenticator.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufSimpleAuthenticator.java b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufSimpleAuthenticator.java
new file mode 100644
index 0000000..59c61e2
--- /dev/null
+++ b/geode-protobuf/src/main/java/org/apache/geode/protocol/protobuf/ProtobufSimpleAuthenticator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.protocol.protobuf;
+
+import org.apache.geode.internal.cache.tier.sockets.StreamAuthenticator;
+import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.security.SecurityManager;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Properties;
+
+public class ProtobufSimpleAuthenticator implements StreamAuthenticator {
+  private boolean authenticated;
+
+  @Override
+  public void receiveMessage(InputStream inputStream, OutputStream outputStream,
+      SecurityManager securityManager) throws IOException {
+    AuthenticationAPI.SimpleAuthenticationRequest authenticationRequest =
+        AuthenticationAPI.SimpleAuthenticationRequest.parseDelimitedFrom(inputStream);
+    if (authenticationRequest == null) {
+      throw new EOFException();
+    }
+
+    Properties properties = new Properties();
+    properties.setProperty("username", authenticationRequest.getUsername());
+    properties.setProperty("password", authenticationRequest.getPassword());
+
+    try {
+      Object principal = securityManager.authenticate(properties);
+      authenticated = principal != null;
+    } catch (AuthenticationFailedException e) {
+      authenticated = false;
+    }
+
+    AuthenticationAPI.SimpleAuthenticationResponse.newBuilder().setAuthenticated(authenticated)
+        .build().writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public boolean isAuthenticated() {
+    return authenticated;
+  }
+
+  @Override
+  public String implementationID() {
+    return "SIMPLE";
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-protobuf/src/main/proto/authentication_API.proto
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/proto/authentication_API.proto b/geode-protobuf/src/main/proto/authentication_API.proto
new file mode 100644
index 0000000..0e651bd
--- /dev/null
+++ b/geode-protobuf/src/main/proto/authentication_API.proto
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+syntax = "proto3";
+package org.apache.geode.protocol.protobuf;
+
+message SimpleAuthenticationRequest {
+    string username = 1;
+    string password = 2;
+}
+
+message SimpleAuthenticationResponse {
+    bool authenticated = 1;
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-protobuf/src/main/proto/clientProtocol.proto
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/proto/clientProtocol.proto b/geode-protobuf/src/main/proto/clientProtocol.proto
index 8203c43..91783b2 100644
--- a/geode-protobuf/src/main/proto/clientProtocol.proto
+++ b/geode-protobuf/src/main/proto/clientProtocol.proto
@@ -56,7 +56,6 @@ message Request {
         GetAvailableServersRequest getAvailableServersRequest = 42;
         GetRegionNamesRequest getRegionNamesRequest = 43;
         GetRegionRequest getRegionRequest = 44;
-
     }
 }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.security.StreamAuthenticator
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.security.StreamAuthenticator b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.security.StreamAuthenticator
new file mode 100644
index 0000000..45e4eea
--- /dev/null
+++ b/geode-protobuf/src/main/resources/META-INF/services/org.apache.geode.security.StreamAuthenticator
@@ -0,0 +1 @@
+org.apache.geode.protocol.protobuf.ProtobufSimpleAuthenticator
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-protobuf/src/test/java/org/apache/geode/protocol/AuthenticationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/AuthenticationIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/AuthenticationIntegrationTest.java
new file mode 100644
index 0000000..794375e
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/AuthenticationIntegrationTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.protocol;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.protocol.protobuf.AuthenticationAPI;
+import org.apache.geode.protocol.protobuf.ClientProtocol;
+import org.apache.geode.protocol.protobuf.ProtobufSerializationService;
+import org.apache.geode.protocol.protobuf.RegionAPI;
+import org.apache.geode.protocol.protobuf.serializer.ProtobufProtocolSerializer;
+import org.apache.geode.security.SecurityManager;
+import org.apache.geode.serialization.registry.exception.CodecAlreadyRegisteredForTypeException;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.awaitility.Awaitility;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(IntegrationTest.class)
+public class AuthenticationIntegrationTest {
+
+  private static final String TEST_USERNAME = "bob";
+  private static final String TEST_PASSWORD = "bobspassword";
+
+  @Rule
+  public final RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
+
+  private Cache cache;
+  private int cacheServerPort;
+  private CacheServer cacheServer;
+  private Socket socket;
+  private OutputStream outputStream;
+  private ProtobufSerializationService serializationService;
+  private InputStream inputStream;
+  private ProtobufProtocolSerializer protobufProtocolSerializer;
+  private Object securityPrincipal;
+  private SecurityManager mockSecurityManager;
+
+  public void setUp(String authenticationMode)
+      throws IOException, CodecAlreadyRegisteredForTypeException {
+    Properties expectedAuthProperties = new Properties();
+    expectedAuthProperties.setProperty("username", TEST_USERNAME);
+    expectedAuthProperties.setProperty("password", TEST_PASSWORD);
+
+    securityPrincipal = new Object();
+    mockSecurityManager = mock(SecurityManager.class);
+    when(mockSecurityManager.authenticate(expectedAuthProperties)).thenReturn(securityPrincipal);
+    when(mockSecurityManager.authorize(same(securityPrincipal), any())).thenReturn(true);
+
+    Properties properties = new Properties();
+    CacheFactory cacheFactory = new CacheFactory(properties);
+    cacheFactory.set("mcast-port", "0"); // sometimes it isn't due to other tests.
+
+    cacheFactory.setSecurityManager(mockSecurityManager);
+    cache = cacheFactory.create();
+
+    cacheServer = cache.addCacheServer();
+    cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.start();
+
+
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+    System.setProperty("geode.protocol-authentication-mode", authenticationMode);
+    socket = new Socket("localhost", cacheServerPort);
+
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+    outputStream = socket.getOutputStream();
+    inputStream = socket.getInputStream();
+    outputStream.write(110);
+
+    serializationService = new ProtobufSerializationService();
+    protobufProtocolSerializer = new ProtobufProtocolSerializer();
+  }
+
+  @Test
+  public void noopAuthenticationSucceeds() throws Exception {
+    setUp("NOOP");
+    ClientProtocol.Message getRegionsMessage =
+        ClientProtocol.Message.newBuilder().setRequest(ClientProtocol.Request.newBuilder()
+            .setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder())).build();
+    protobufProtocolSerializer.serialize(getRegionsMessage, outputStream);
+
+    ClientProtocol.Message regionsResponse = protobufProtocolSerializer.deserialize(inputStream);
+    assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONNAMESRESPONSE,
+        regionsResponse.getResponse().getResponseAPICase());
+  }
+
+  @Test
+  public void simpleAuthenticationSucceeds() throws Exception {
+    setUp("SIMPLE");
+    AuthenticationAPI.SimpleAuthenticationRequest authenticationRequest =
+        AuthenticationAPI.SimpleAuthenticationRequest.newBuilder().setUsername(TEST_USERNAME)
+            .setPassword(TEST_PASSWORD).build();
+    authenticationRequest.writeDelimitedTo(outputStream);
+
+    AuthenticationAPI.SimpleAuthenticationResponse authenticationResponse =
+        AuthenticationAPI.SimpleAuthenticationResponse.parseDelimitedFrom(inputStream);
+    assertTrue(authenticationResponse.getAuthenticated());
+
+    ClientProtocol.Message getRegionsMessage =
+        ClientProtocol.Message.newBuilder().setRequest(ClientProtocol.Request.newBuilder()
+            .setGetRegionNamesRequest(RegionAPI.GetRegionNamesRequest.newBuilder())).build();
+    protobufProtocolSerializer.serialize(getRegionsMessage, outputStream);
+
+    ClientProtocol.Message regionsResponse = protobufProtocolSerializer.deserialize(inputStream);
+    assertEquals(ClientProtocol.Response.ResponseAPICase.GETREGIONNAMESRESPONSE,
+        regionsResponse.getResponse().getResponseAPICase());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/a7a197d6/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufSimpleAuthenticatorJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufSimpleAuthenticatorJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufSimpleAuthenticatorJUnitTest.java
new file mode 100644
index 0000000..3d16f5e
--- /dev/null
+++ b/geode-protobuf/src/test/java/org/apache/geode/protocol/protobuf/ProtobufSimpleAuthenticatorJUnitTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.protocol.protobuf;
+
+import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.security.ResourcePermission;
+import org.apache.geode.security.SecurityManager;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(UnitTest.class)
+public class ProtobufSimpleAuthenticatorJUnitTest {
+  private static final String TEST_USERNAME = "user1";
+  private static final String TEST_PASSWORD = "hunter2";
+  private ByteArrayInputStream byteArrayInputStream; // initialized with an incoming request in
+                                                     // setUp.
+  private ByteArrayOutputStream byteArrayOutputStream;
+  private ProtobufSimpleAuthenticator protobufSimpleAuthenticator;
+  private SecurityManager mockSecurityManager;
+  private Object securityPrincipal;
+
+  @Before
+  public void setUp() throws IOException {
+    AuthenticationAPI.SimpleAuthenticationRequest basicAuthenticationRequest =
+        AuthenticationAPI.SimpleAuthenticationRequest.newBuilder().setUsername(TEST_USERNAME)
+            .setPassword(TEST_PASSWORD).build();
+
+    Properties expectedAuthProperties = new Properties();
+    expectedAuthProperties.setProperty("username", TEST_USERNAME);
+    expectedAuthProperties.setProperty("password", TEST_PASSWORD);
+
+    ByteArrayOutputStream messageStream = new ByteArrayOutputStream();
+    basicAuthenticationRequest.writeDelimitedTo(messageStream);
+    byteArrayInputStream = new ByteArrayInputStream(messageStream.toByteArray());
+    byteArrayOutputStream = new ByteArrayOutputStream();
+
+    securityPrincipal = new Object();
+    mockSecurityManager = mock(SecurityManager.class);
+    when(mockSecurityManager.authenticate(expectedAuthProperties)).thenReturn(securityPrincipal);
+    when(mockSecurityManager.authorize(same(securityPrincipal), any())).thenReturn(true);
+
+    protobufSimpleAuthenticator = new ProtobufSimpleAuthenticator();
+  }
+
+  @Test
+  public void successfulAuthentication() throws IOException {
+    assertFalse(protobufSimpleAuthenticator.isAuthenticated());
+
+    protobufSimpleAuthenticator.receiveMessage(byteArrayInputStream, byteArrayOutputStream,
+        mockSecurityManager);
+
+    AuthenticationAPI.SimpleAuthenticationResponse simpleAuthenticationResponse =
+        getSimpleAuthenticationResponse(byteArrayOutputStream);
+
+    assertTrue(simpleAuthenticationResponse.getAuthenticated());
+    assertTrue(protobufSimpleAuthenticator.isAuthenticated());
+  }
+
+  @Test
+  public void authenticationFails() throws IOException {
+    assertFalse(protobufSimpleAuthenticator.isAuthenticated());
+
+    Properties expectedAuthProperties = new Properties();
+    expectedAuthProperties.setProperty("username", TEST_USERNAME);
+    expectedAuthProperties.setProperty("password", TEST_PASSWORD);
+    when(mockSecurityManager.authenticate(expectedAuthProperties))
+        .thenThrow(new AuthenticationFailedException("BOOM!"));
+
+    protobufSimpleAuthenticator.receiveMessage(byteArrayInputStream, byteArrayOutputStream,
+        mockSecurityManager);
+
+    AuthenticationAPI.SimpleAuthenticationResponse simpleAuthenticationResponse =
+        getSimpleAuthenticationResponse(byteArrayOutputStream);
+
+    assertFalse(simpleAuthenticationResponse.getAuthenticated());
+    assertFalse(protobufSimpleAuthenticator.isAuthenticated());
+  }
+
+  private AuthenticationAPI.SimpleAuthenticationResponse getSimpleAuthenticationResponse(
+      ByteArrayOutputStream outputStream) throws IOException {
+    ByteArrayInputStream responseStream = new ByteArrayInputStream(outputStream.toByteArray());
+    return AuthenticationAPI.SimpleAuthenticationResponse.parseDelimitedFrom(responseStream);
+  }
+}