You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by pr...@apache.org on 2018/01/25 23:27:23 UTC

[geode] branch develop updated: GEODE-3738: Use DUnit Rules to remove flakiness from legacy Authorization tests

This is an automated email from the ASF dual-hosted git repository.

prhomberg pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e664ce6  GEODE-3738: Use DUnit Rules to remove flakiness from legacy Authorization tests
e664ce6 is described below

commit e664ce6c07a7d338ee6e48160deb82f85e8e01d3
Author: Patrick Rhomberg <Pu...@users.noreply.github.com>
AuthorDate: Thu Jan 25 15:27:21 2018 -0800

    GEODE-3738: Use DUnit Rules to remove flakiness from legacy Authorization tests
---
 .../security/ClientAuthorizationDUnitTest.java     | 756 ---------------------
 ...tAuthorizationLegacyConfigurationDUnitTest.java | 216 ++++++
 ...aAuthorizationUsingLegacySecurityDUnitTest.java | 246 +++++++
 ...onUsingLegacySecurityWithFailoverDUnitTest.java | 423 ++++++++++++
 .../security/templates/SimpleAccessController.java |  89 +++
 .../security/templates/SimpleAuthenticator.java    |  58 ++
 6 files changed, 1032 insertions(+), 756 deletions(-)

diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java
deleted file mode 100644
index a16600e..0000000
--- a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationDUnitTest.java
+++ /dev/null
@@ -1,756 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.security;
-
-import static org.apache.geode.internal.AvailablePort.SOCKET;
-import static org.apache.geode.internal.AvailablePort.getRandomAvailablePort;
-import static org.apache.geode.security.SecurityTestUtils.AUTHFAIL_EXCEPTION;
-import static org.apache.geode.security.SecurityTestUtils.NOTAUTHZ_EXCEPTION;
-import static org.apache.geode.security.SecurityTestUtils.NO_EXCEPTION;
-import static org.apache.geode.security.SecurityTestUtils.OTHER_EXCEPTION;
-import static org.apache.geode.security.SecurityTestUtils.closeCache;
-import static org.apache.geode.security.SecurityTestUtils.concatProperties;
-import static org.apache.geode.security.SecurityTestUtils.createCacheClient;
-import static org.apache.geode.security.SecurityTestUtils.createCacheClientWithDynamicRegion;
-import static org.apache.geode.security.SecurityTestUtils.doGets;
-import static org.apache.geode.security.SecurityTestUtils.doNGets;
-import static org.apache.geode.security.SecurityTestUtils.doNPuts;
-import static org.apache.geode.security.SecurityTestUtils.doPutAllP;
-import static org.apache.geode.security.SecurityTestUtils.doPuts;
-import static org.apache.geode.security.SecurityTestUtils.getLocatorPort;
-import static org.apache.geode.test.dunit.Assert.fail;
-import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
-import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import org.apache.geode.cache.operations.OperationContext.OperationCode;
-import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
-import org.apache.geode.security.generator.AuthzCredentialGenerator;
-import org.apache.geode.security.generator.CredentialGenerator;
-import org.apache.geode.security.generator.DummyCredentialGenerator;
-import org.apache.geode.security.generator.XmlAuthzCredentialGenerator;
-import org.apache.geode.security.templates.UserPasswordAuthInit;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.standalone.VersionManager;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
-import org.apache.geode.test.junit.categories.SecurityTest;
-import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
-
-/**
- * Tests for authorization from client to server. This tests for authorization of all operations
- * with both valid and invalid credentials/modules with pre-operation callbacks. It also checks for
- * authorization in case of failover.
- *
- * @since GemFire 5.5
- */
-@Category({DistributedTest.class, SecurityTest.class, FlakyTest.class}) // GEODE-3738
-@RunWith(Parameterized.class)
-@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
-public class ClientAuthorizationDUnitTest extends ClientAuthorizationTestCase {
-  @Parameterized.Parameters
-  public static Collection<String> data() {
-    List<String> result = VersionManager.getInstance().getVersionsWithoutCurrent();
-    result.add("000");
-    result.remove("130");
-    if (result.size() < 1) {
-      throw new RuntimeException("No older versions of Geode were found to test against");
-    } else {
-      System.out.println("running against these versions: " + result);
-    }
-    return result;
-  }
-
-  public ClientAuthorizationDUnitTest(String version) {
-    super();
-    clientVersion = version;
-  }
-
-
-  @Override
-  public final void preTearDownClientAuthorizationTestBase() throws Exception {
-    closeCache();
-  }
-
-  @Test
-  public void testAllowPutsGets() {
-    AuthzCredentialGenerator gen = getXmlAuthzGenerator();
-    CredentialGenerator cGen = gen.getCredentialGenerator();
-    Properties extraAuthProps = cGen.getSystemProperties();
-    Properties javaProps = cGen.getJavaProperties();
-    Properties extraAuthzProps = gen.getSystemProperties();
-    String authenticator = cGen.getAuthenticator();
-    String authInit = cGen.getAuthInit();
-    String accessor = gen.getAuthorizationCallback();
-
-    getLogWriter().info("testAllowPutsGets: Using authinit: " + authInit);
-    getLogWriter().info("testAllowPutsGets: Using authenticator: " + authenticator);
-    getLogWriter().info("testAllowPutsGets: Using accessor: " + accessor);
-
-    // Start servers with all required properties
-    Properties serverProps =
-        buildProperties(authenticator, accessor, false, extraAuthProps, extraAuthzProps);
-
-    int port1 = createServer1(javaProps, serverProps);
-    int port2 = createServer2(javaProps, serverProps);
-
-    // Start client1 with valid CREATE credentials
-    Properties createCredentials = gen.getAllowedCredentials(
-        new OperationCode[] {OperationCode.PUT}, new String[] {regionName}, 1);
-    javaProps = cGen.getJavaProperties();
-
-    getLogWriter().info("testAllowPutsGets: For first client credentials: " + createCredentials);
-
-    createClient1NoException(javaProps, authInit, port1, port2, createCredentials);
-
-    // Start client2 with valid GET credentials
-    Properties getCredentials = gen.getAllowedCredentials(new OperationCode[] {OperationCode.GET},
-        new String[] {regionName}, 2);
-    javaProps = cGen.getJavaProperties();
-
-    getLogWriter().info("testAllowPutsGets: For second client credentials: " + getCredentials);
-
-    createClient2NoException(javaProps, authInit, port1, port2, getCredentials);
-
-    // Perform some put operations from client1
-    client1.invoke(() -> doPuts(2, NO_EXCEPTION));
-
-    // Verify that the gets succeed
-    client2.invoke(() -> doGets(2, NO_EXCEPTION));
-  }
-
-  @Test
-  public void testPutAllWithSecurity() {
-    AuthzCredentialGenerator gen = getXmlAuthzGenerator();
-    CredentialGenerator cGen = gen.getCredentialGenerator();
-    Properties extraAuthProps = cGen.getSystemProperties();
-    Properties javaProps = cGen.getJavaProperties();
-    Properties extraAuthzProps = gen.getSystemProperties();
-    String authenticator = cGen.getAuthenticator();
-    String authInit = cGen.getAuthInit();
-    String accessor = gen.getAuthorizationCallback();
-
-    // this test registers a PDX type and needs the servers to accept it without
-    // credentials in old clients
-    if (clientVersion.compareTo("130") < 0 && !VersionManager.isCurrentVersion(clientVersion)) {
-      server1.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
-      server2.invoke(() -> ServerConnection.allowInternalMessagesWithoutCredentials = true);
-    }
-
-    getLogWriter().info("testPutAllWithSecurity: Using authinit: " + authInit);
-    getLogWriter().info("testPutAllWithSecurity: Using authenticator: " + authenticator);
-    getLogWriter().info("testPutAllWithSecurity: Using accessor: " + accessor);
-
-    // Start servers with all required properties
-    Properties serverProps =
-        buildProperties(authenticator, accessor, false, extraAuthProps, extraAuthzProps);
-
-    int port1 = createServer1(javaProps, serverProps);
-    int port2 = createServer2(javaProps, serverProps);
-
-    // Start client1 with valid CREATE credentials
-    Properties createCredentials = gen.getAllowedCredentials(
-        new OperationCode[] {OperationCode.PUTALL}, new String[] {regionName}, 1);
-    javaProps = cGen.getJavaProperties();
-
-    getLogWriter()
-        .info("testPutAllWithSecurity: For first client credentials: " + createCredentials);
-
-    createClient1NoException(javaProps, authInit, port1, port2, createCredentials);
-
-    // Perform some put all operations from client1
-    client1.invoke(() -> doPutAllP());
-  }
-
-  @Test
-  public void testDisallowPutsGets() {
-    AuthzCredentialGenerator gen = getXmlAuthzGenerator();
-    CredentialGenerator cGen = gen.getCredentialGenerator();
-    Properties extraAuthProps = cGen.getSystemProperties();
-    Properties javaProps = cGen.getJavaProperties();
-    Properties extraAuthzProps = gen.getSystemProperties();
-    String authenticator = cGen.getAuthenticator();
-    String authInit = cGen.getAuthInit();
-    String accessor = gen.getAuthorizationCallback();
-
-    getLogWriter().info("testDisallowPutsGets: Using authinit: " + authInit);
-    getLogWriter().info("testDisallowPutsGets: Using authenticator: " + authenticator);
-    getLogWriter().info("testDisallowPutsGets: Using accessor: " + accessor);
-
-    // Check that we indeed can obtain valid credentials not allowed to do gets
-    Properties createCredentials = gen.getAllowedCredentials(
-        new OperationCode[] {OperationCode.PUT}, new String[] {regionName}, 1);
-    Properties createJavaProps = cGen.getJavaProperties();
-
-    getLogWriter().info("testDisallowPutsGets: For first client credentials: " + createCredentials);
-
-    Properties getCredentials = gen.getDisallowedCredentials(
-        new OperationCode[] {OperationCode.GET}, new String[] {regionName}, 2);
-    Properties getJavaProps = cGen.getJavaProperties();
-
-    getLogWriter().info(
-        "testDisallowPutsGets: For second client disallowed GET credentials: " + getCredentials);
-
-    // Start servers with all required properties
-    Properties serverProps =
-        buildProperties(authenticator, accessor, false, extraAuthProps, extraAuthzProps);
-
-    int port1 = createServer1(javaProps, serverProps);
-    int port2 = createServer2(javaProps, serverProps);
-
-    createClient1NoException(createJavaProps, authInit, port1, port2, createCredentials);
-
-    createClient2NoException(getJavaProps, authInit, port1, port2, getCredentials);
-
-    // Perform some put operations from client1
-    client1.invoke(() -> doPuts(2, NO_EXCEPTION));
-
-    // Gets as normal user should throw exception
-    client2.invoke(() -> doGets(2, NOTAUTHZ_EXCEPTION));
-
-    // Try to connect client2 with reader credentials
-    getCredentials = gen.getAllowedCredentials(new OperationCode[] {OperationCode.GET},
-        new String[] {regionName}, 5);
-    getJavaProps = cGen.getJavaProperties();
-
-    getLogWriter()
-        .info("testDisallowPutsGets: For second client with GET credentials: " + getCredentials);
-
-    createClient2NoException(getJavaProps, authInit, port1, port2, getCredentials);
-
-    // Verify that the gets succeed
-    client2.invoke(() -> doGets(2, NO_EXCEPTION));
-
-    // Verify that the puts throw exception
-    client2.invoke(() -> doNPuts(2, NOTAUTHZ_EXCEPTION));
-  }
-
-  @Test
-  public void testInvalidAccessor() {
-    AuthzCredentialGenerator gen = getXmlAuthzGenerator();
-    CredentialGenerator cGen = gen.getCredentialGenerator();
-    Properties extraAuthProps = cGen.getSystemProperties();
-    Properties javaProps = cGen.getJavaProperties();
-    Properties extraAuthzProps = gen.getSystemProperties();
-    String authenticator = cGen.getAuthenticator();
-    String authInit = cGen.getAuthInit();
-    String accessor = gen.getAuthorizationCallback();
-
-    getLogWriter().info("testInvalidAccessor: Using authinit: " + authInit);
-    getLogWriter().info("testInvalidAccessor: Using authenticator: " + authenticator);
-
-    // Start server1 with invalid accessor
-    Properties serverProps =
-        buildProperties(authenticator, "org.apache.none", false, extraAuthProps, extraAuthzProps);
-
-    int port1 = createServer1(javaProps, serverProps);
-    int port2 = getRandomAvailablePort(SOCKET);
-
-    // Client creation should throw exceptions
-    Properties createCredentials = gen.getAllowedCredentials(
-        new OperationCode[] {OperationCode.PUT}, new String[] {regionName}, 3);
-    Properties createJavaProps = cGen.getJavaProperties();
-
-    getLogWriter()
-        .info("testInvalidAccessor: For first client CREATE credentials: " + createCredentials);
-
-    Properties getCredentials = gen.getAllowedCredentials(new OperationCode[] {OperationCode.GET},
-        new String[] {regionName}, 7);
-    Properties getJavaProps = cGen.getJavaProperties();
-
-    getLogWriter()
-        .info("testInvalidAccessor: For second client GET credentials: " + getCredentials);
-
-    client1.invoke(() -> ClientAuthenticationTestUtils.createCacheClient(authInit,
-        createCredentials, createJavaProps, port1, port2, 0, false, false, NO_EXCEPTION));
-    client1.invoke(() -> doPuts(1, AUTHFAIL_EXCEPTION));
-
-    client2.invoke(() -> ClientAuthenticationTestUtils.createCacheClient(authInit, getCredentials,
-        getJavaProps, port1, port2, 0, false, false, NO_EXCEPTION));
-    client2.invoke(() -> doPuts(1, AUTHFAIL_EXCEPTION));
-
-    // Now start server2 that has valid accessor
-    getLogWriter().info("testInvalidAccessor: Using accessor: " + accessor);
-    serverProps = buildProperties(authenticator, accessor, false, extraAuthProps, extraAuthzProps);
-    createServer2(javaProps, serverProps, port2);
-    server1.invoke(() -> closeCache());
-
-    createClient1NoException(createJavaProps, authInit, port1, port2, createCredentials);
-    createClient2NoException(getJavaProps, authInit, port1, port2, getCredentials);
-
-    // Now perform some put operations from client1
-    client1.invoke(() -> doPuts(4, NO_EXCEPTION));
-
-    // Verify that the gets succeed
-    client2.invoke(() -> doGets(4, NO_EXCEPTION));
-  }
-
-  @Test
-  public void testPutsGetsWithFailover() {
-    AuthzCredentialGenerator gen = getXmlAuthzGenerator();
-    CredentialGenerator cGen = gen.getCredentialGenerator();
-    Properties extraAuthProps = cGen.getSystemProperties();
-    Properties javaProps = cGen.getJavaProperties();
-    Properties extraAuthzProps = gen.getSystemProperties();
-    String authenticator = cGen.getAuthenticator();
-    String authInit = cGen.getAuthInit();
-    String accessor = gen.getAuthorizationCallback();
-
-    getLogWriter().info("testPutsGetsWithFailover: Using authinit: " + authInit);
-    getLogWriter().info("testPutsGetsWithFailover: Using authenticator: " + authenticator);
-    getLogWriter().info("testPutsGetsWithFailover: Using accessor: " + accessor);
-
-    // Start servers with all required properties
-    Properties serverProps =
-        buildProperties(authenticator, accessor, false, extraAuthProps, extraAuthzProps);
-
-    int port1 = createServer1(javaProps, serverProps);
-
-    // Get a port for second server but do not start it. This forces the clients to connect to the
-    // first server
-    int port2 = getRandomAvailablePort(SOCKET);
-
-    // Start client1 with valid CREATE credentials
-    Properties createCredentials = gen.getAllowedCredentials(
-        new OperationCode[] {OperationCode.PUT}, new String[] {regionName}, 1);
-    Properties createJavaProps = cGen.getJavaProperties();
-
-    getLogWriter()
-        .info("testPutsGetsWithFailover: For first client credentials: " + createCredentials);
-
-    createClient1NoException(createJavaProps, authInit, port1, port2, createCredentials);
-
-    // Start client2 with valid GET credentials
-    Properties getCredentials = gen.getAllowedCredentials(new OperationCode[] {OperationCode.GET},
-        new String[] {regionName}, 5);
-    Properties getJavaProps = cGen.getJavaProperties();
-
-    getLogWriter()
-        .info("testPutsGetsWithFailover: For second client credentials: " + getCredentials);
-
-    createClient2NoException(getJavaProps, authInit, port1, port2, getCredentials);
-
-    // Perform some put operations from client1
-    client1.invoke(() -> doPuts(2, NO_EXCEPTION));
-
-    // Verify that the puts succeeded
-    client2.invoke(() -> doGets(2, NO_EXCEPTION));
-
-    createServer2(javaProps, serverProps, port2);
-    server1.invoke(() -> closeCache());
-
-    // Perform some put operations from client1
-    client1.invoke(() -> doNPuts(4, NO_EXCEPTION));
-
-    // Verify that the puts succeeded
-    client2.invoke(() -> doNGets(4, NO_EXCEPTION));
-
-    // Now re-connect with credentials not allowed to do gets
-    Properties noGetCredentials = gen.getDisallowedCredentials(
-        new OperationCode[] {OperationCode.GET}, new String[] {regionName}, 9);
-    getJavaProps = cGen.getJavaProperties();
-
-    getLogWriter().info("testPutsGetsWithFailover: For second client disallowed GET credentials: "
-        + noGetCredentials);
-
-    createClient2NoException(getJavaProps, authInit, port1, port2, noGetCredentials);
-
-    // Perform some put operations from client1
-    client1.invoke(() -> doPuts(4, NO_EXCEPTION));
-
-    // Gets as normal user should throw exception
-    client2.invoke(() -> doGets(4, NOTAUTHZ_EXCEPTION));
-
-    // force a failover and do the drill again
-    server1.invoke(() -> ClientAuthorizationTestCase.createCacheServer(getLocatorPort(), port1,
-        serverProps, javaProps));
-    server2.invoke(() -> closeCache());
-
-    // Perform some put operations from client1
-    client1.invoke(() -> doNPuts(4, NO_EXCEPTION));
-
-    // Gets as normal user should throw exception
-    client2.invoke(() -> doNGets(4, NOTAUTHZ_EXCEPTION));
-
-    createClient2NoException(getJavaProps, authInit, port1, port2, getCredentials);
-
-    // Verify that the gets succeed
-    client2.invoke(() -> doNGets(4, NO_EXCEPTION));
-
-    // Verify that the puts throw exception
-    client2.invoke(() -> doPuts(4, NOTAUTHZ_EXCEPTION));
-  }
-
-  @Test
-  public void testUnregisterInterestWithFailover() throws InterruptedException {
-    OperationWithAction[] unregisterOps = unregisterOpsForTestUnregisterInterestWithFailover();
-
-    AuthzCredentialGenerator gen = new XmlAuthzCredentialGenerator();
-    CredentialGenerator cGen = new DummyCredentialGenerator();
-    cGen.init();
-    gen.init(cGen);
-    Properties extraAuthProps = cGen.getSystemProperties();
-    Properties javaProps = cGen.getJavaProperties();
-    Properties extraAuthzProps = gen.getSystemProperties();
-    String authenticator = cGen.getAuthenticator();
-    String authInit = cGen.getAuthInit();
-    String accessor = gen.getAuthorizationCallback();
-
-    getLogWriter().info("testAllOpsWithFailover: Using authinit: " + authInit);
-    getLogWriter().info("testAllOpsWithFailover: Using authenticator: " + authenticator);
-    getLogWriter().info("testAllOpsWithFailover: Using accessor: " + accessor);
-
-    // Start servers with all required properties
-    Properties serverProps =
-        buildProperties(authenticator, accessor, false, extraAuthProps, extraAuthzProps);
-
-    // Get ports for the servers
-    int[] randomAvailableTCPPorts = AvailablePortHelper.getRandomAvailableTCPPorts(2);
-    int port1 = randomAvailableTCPPorts[0];
-    int port2 = randomAvailableTCPPorts[1];
-
-    // Perform all the ops on the clients
-    List opBlock = new ArrayList();
-    for (int opNum = 0; opNum < unregisterOps.length; ++opNum) {
-
-      // Start client with valid credentials as specified in OperationWithAction
-      OperationWithAction currentOp = unregisterOps[opNum];
-      if (currentOp.equals(OperationWithAction.OPBLOCK_END)
-          || currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) {
-
-        // End of current operation block; execute all the operations on the servers with/without
-        // failover
-        if (opBlock.size() > 0) {
-          // Start the first server and execute the operation block
-          server1.invoke(() -> ClientAuthorizationTestCase.createCacheServer(getLocatorPort(),
-              port1, serverProps, javaProps));
-          server2.invoke(() -> closeCache());
-
-          executeRIOpBlock(opBlock, port1, port2, authInit, extraAuthProps, extraAuthzProps,
-              javaProps);
-
-          if (!currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) {
-            createServer2(javaProps, serverProps, port2);
-            server1.invoke(() -> closeCache());
-
-            executeRIOpBlock(opBlock, port1, port2, authInit, extraAuthProps, extraAuthzProps,
-                javaProps);
-          }
-          opBlock.clear();
-        }
-
-      } else {
-        currentOp.setOpNum(opNum);
-        opBlock.add(currentOp);
-      }
-    }
-  }
-
-  @Test
-  public void testAllOpsWithFailover() throws InterruptedException {
-    addIgnoredException("Read timed out");
-    runOpsWithFailOver(allOpsForAllOpsWithFailover(), "testAllOpsWithFailover");
-  }
-
-  private OperationWithAction[] unregisterOpsForTestUnregisterInterestWithFailover() {
-    return new OperationWithAction[] {
-        // Register interest in all KEYS using one key at a time
-        new OperationWithAction(OperationCode.REGISTER_INTEREST, OperationCode.UNREGISTER_INTEREST,
-            3, OpFlags.NONE, 4),
-        new OperationWithAction(OperationCode.REGISTER_INTEREST, 2),
-        // UPDATE and test with GET
-        new OperationWithAction(OperationCode.PUT),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.LOCAL_OP, 4),
-
-        // Unregister interest in all KEYS using one key at a time
-        new OperationWithAction(OperationCode.UNREGISTER_INTEREST, 3,
-            OpFlags.USE_OLDCONN | OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.UNREGISTER_INTEREST, 2, OpFlags.USE_OLDCONN, 4),
-        // UPDATE and test with GET for no updates
-        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN | OpFlags.USE_NEWVAL, 4),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.LOCAL_OP, 4),
-
-        OperationWithAction.OPBLOCK_END,
-
-        // Register interest in all KEYS using list
-        new OperationWithAction(OperationCode.REGISTER_INTEREST, OperationCode.UNREGISTER_INTEREST,
-            3, OpFlags.USE_LIST, 4),
-        new OperationWithAction(OperationCode.REGISTER_INTEREST, 1, OpFlags.USE_LIST, 4),
-        // UPDATE and test with GET
-        new OperationWithAction(OperationCode.PUT, 2),
-        new OperationWithAction(OperationCode.GET, 1, OpFlags.USE_OLDCONN | OpFlags.LOCAL_OP, 4),
-
-        // Unregister interest in all KEYS using list
-        new OperationWithAction(OperationCode.UNREGISTER_INTEREST, 3,
-            OpFlags.USE_OLDCONN | OpFlags.USE_LIST | OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.UNREGISTER_INTEREST, 1,
-            OpFlags.USE_OLDCONN | OpFlags.USE_LIST, 4),
-        // UPDATE and test with GET for no updates
-        new OperationWithAction(OperationCode.PUT, 2, OpFlags.USE_OLDCONN | OpFlags.USE_NEWVAL, 4),
-        new OperationWithAction(OperationCode.GET, 1, OpFlags.USE_OLDCONN | OpFlags.LOCAL_OP, 4),
-
-        OperationWithAction.OPBLOCK_END,
-
-        // Register interest in all KEYS using regular expression
-        new OperationWithAction(OperationCode.REGISTER_INTEREST, OperationCode.UNREGISTER_INTEREST,
-            3, OpFlags.USE_REGEX, 4),
-        new OperationWithAction(OperationCode.REGISTER_INTEREST, 2, OpFlags.USE_REGEX, 4),
-        // UPDATE and test with GET
-        new OperationWithAction(OperationCode.PUT),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.LOCAL_OP, 4),
-
-        // Unregister interest in all KEYS using regular expression
-        new OperationWithAction(OperationCode.UNREGISTER_INTEREST, 3,
-            OpFlags.USE_OLDCONN | OpFlags.USE_REGEX | OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.UNREGISTER_INTEREST, 2,
-            OpFlags.USE_OLDCONN | OpFlags.USE_REGEX, 4),
-        // UPDATE and test with GET for no updates
-        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN | OpFlags.USE_NEWVAL, 4),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.LOCAL_OP, 4),
-
-        OperationWithAction.OPBLOCK_END};
-  }
-
-  private OperationWithAction[] allOpsForAllOpsWithFailover() {
-    return new OperationWithAction[] {
-        // Test CREATE and verify with a GET
-        new OperationWithAction(OperationCode.PUT, 3, OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.PUT),
-        new OperationWithAction(OperationCode.GET, 3, OpFlags.CHECK_NOKEY | OpFlags.CHECK_NOTAUTHZ,
-            4),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.CHECK_NOKEY, 4),
-
-        // OPBLOCK_END indicates end of an operation block; the above block of three operations will
-        // be first executed on server1 and then on server2 after failover
-        OperationWithAction.OPBLOCK_END,
-
-        // Test PUTALL and verify with GETs
-        new OperationWithAction(OperationCode.PUTALL, 3,
-            OpFlags.USE_NEWVAL | OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.PUTALL, 1, OpFlags.USE_NEWVAL, 4),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.USE_NEWVAL, 4),
-        OperationWithAction.OPBLOCK_END,
-
-        // Test UPDATE and verify with a GET
-        new OperationWithAction(OperationCode.PUT, 3, OpFlags.USE_NEWVAL | OpFlags.CHECK_NOTAUTHZ,
-            4),
-        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_NEWVAL, 4),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.USE_NEWVAL, 4),
-
-        OperationWithAction.OPBLOCK_END,
-
-        // Test DESTROY and verify with a GET and that key should not exist
-        new OperationWithAction(OperationCode.DESTROY, 3,
-            OpFlags.USE_NEWVAL | OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.DESTROY),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.CHECK_FAIL, 4),
-        // Repopulate the region
-        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_NEWVAL, 4),
-
-        OperationWithAction.OPBLOCK_END,
-
-        // Check CONTAINS_KEY
-        new OperationWithAction(OperationCode.CONTAINS_KEY, 3, OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.CONTAINS_KEY),
-        // Destroy the KEYS and check for failure in CONTAINS_KEY
-        new OperationWithAction(OperationCode.DESTROY, 2),
-        new OperationWithAction(OperationCode.CONTAINS_KEY, 3,
-            OpFlags.CHECK_FAIL | OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.CONTAINS_KEY, 1,
-            OpFlags.USE_OLDCONN | OpFlags.CHECK_FAIL, 4),
-        // Repopulate the region
-        new OperationWithAction(OperationCode.PUT),
-
-        OperationWithAction.OPBLOCK_END,
-
-        // Check KEY_SET
-        new OperationWithAction(OperationCode.KEY_SET, 3, OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.KEY_SET, 2),
-
-        OperationWithAction.OPBLOCK_END,
-
-        // Check QUERY
-        new OperationWithAction(OperationCode.QUERY, 3, OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.QUERY),
-
-        OperationWithAction.OPBLOCK_END,
-
-        // Register interest in all KEYS using one key at a time
-        new OperationWithAction(OperationCode.REGISTER_INTEREST, 3, OpFlags.CHECK_NOTAUTHZ, 4),
-        new OperationWithAction(OperationCode.REGISTER_INTEREST, 2),
-        // UPDATE and test with GET
-        new OperationWithAction(OperationCode.PUT),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.LOCAL_OP, 4),
-
-        // Unregister interest in all KEYS using one key at a time
-        new OperationWithAction(OperationCode.UNREGISTER_INTEREST, 2, OpFlags.USE_OLDCONN, 4),
-        // UPDATE and test with GET for no updates
-        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN | OpFlags.USE_NEWVAL, 4),
-        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN | OpFlags.LOCAL_OP, 4),
-
-        OperationWithAction.OPBLOCK_END,
-
-        // Test GET_ENTRY inside a TX, see #49951
-        new OperationWithAction(OperationCode.GET, 2,
-            OpFlags.USE_GET_ENTRY_IN_TX | OpFlags.CHECK_FAIL, 4),
-
-        OperationWithAction.OPBLOCK_END};
-
-  }
-
-  private Properties getUserPassword(final String userName) {
-    Properties props = new Properties();
-    props.setProperty(UserPasswordAuthInit.USER_NAME, userName);
-    props.setProperty(UserPasswordAuthInit.PASSWORD, userName);
-    return props;
-  }
-
-  private void executeRIOpBlock(final List<OperationWithAction> opBlock, final int port1,
-      final int port2, final String authInit, final Properties extraAuthProps,
-      final Properties extraAuthzProps, final Properties javaProps) throws InterruptedException {
-    for (Iterator opIter = opBlock.iterator(); opIter.hasNext();) {
-      // Start client with valid credentials as specified in OperationWithAction
-      OperationWithAction currentOp = (OperationWithAction) opIter.next();
-      OperationCode opCode = currentOp.getOperationCode();
-      int opFlags = currentOp.getFlags();
-      int clientNum = currentOp.getClientNum();
-      VM clientVM = null;
-      boolean useThisVM = false;
-
-      switch (clientNum) {
-        case 1:
-          clientVM = client1;
-          break;
-        case 2:
-          clientVM = client2;
-          break;
-        case 3:
-          useThisVM = true;
-          break;
-        default:
-          fail("executeRIOpBlock: Unknown client number " + clientNum);
-          break;
-      }
-
-      getLogWriter().info("executeRIOpBlock: performing operation number [" + currentOp.getOpNum()
-          + "]: " + currentOp);
-      if ((opFlags & OpFlags.USE_OLDCONN) == 0) {
-        Properties opCredentials = null;
-        String currentRegionName = '/' + regionName;
-        if ((opFlags & OpFlags.USE_SUBREGION) > 0) {
-          currentRegionName += ('/' + SUBREGION_NAME);
-        }
-        String credentialsTypeStr;
-        OperationCode authOpCode = currentOp.getAuthzOperationCode();
-
-        if ((opFlags & OpFlags.CHECK_NOTAUTHZ) > 0 || (opFlags & OpFlags.USE_NOTAUTHZ) > 0
-            || !authOpCode.equals(opCode)) {
-          credentialsTypeStr = " unauthorized " + authOpCode;
-          if (authOpCode.isRegisterInterest()) {
-            opCredentials = getUserPassword("reader7");
-          } else if (authOpCode.isUnregisterInterest()) {
-            opCredentials = getUserPassword("reader6");
-          } else {
-            fail("executeRIOpBlock: cannot determine credentials for" + credentialsTypeStr);
-          }
-
-        } else {
-          credentialsTypeStr = " authorized " + authOpCode;
-          if (authOpCode.isRegisterInterest() || authOpCode.isUnregisterInterest()) {
-            opCredentials = getUserPassword("reader5");
-          } else if (authOpCode.isPut()) {
-            opCredentials = getUserPassword("writer1");
-          } else if (authOpCode.isGet()) {
-            opCredentials = getUserPassword("reader1");
-          } else {
-            fail("executeRIOpBlock: cannot determine credentials for" + credentialsTypeStr);
-          }
-        }
-
-        Properties clientProps =
-            concatProperties(new Properties[] {opCredentials, extraAuthProps, extraAuthzProps});
-
-        // Start the client with valid credentials but allowed or disallowed to perform an operation
-        getLogWriter().info("executeRIOpBlock: For client" + clientNum + credentialsTypeStr
-            + " credentials: " + opCredentials);
-        if (useThisVM) {
-          createCacheClientWithDynamicRegion(authInit, clientProps, javaProps,
-              new int[] {port1, port2}, 0, false, NO_EXCEPTION);
-        } else {
-          clientVM.invoke(() -> createCacheClient(authInit, clientProps, javaProps,
-              new int[] {port1, port2}, 0, false, NO_EXCEPTION));
-        }
-
-      }
-
-      int expectedResult;
-      if ((opFlags & OpFlags.CHECK_NOTAUTHZ) > 0) {
-        expectedResult = NOTAUTHZ_EXCEPTION;
-      } else if ((opFlags & OpFlags.CHECK_EXCEPTION) > 0) {
-        expectedResult = OTHER_EXCEPTION;
-      } else {
-        expectedResult = NO_EXCEPTION;
-      }
-
-      // Perform the operation from selected client
-      if (useThisVM) {
-        doOp(opCode, currentOp.getIndices(), opFlags, expectedResult);
-
-      } else {
-        int[] indices = currentOp.getIndices();
-        clientVM.invoke(
-            () -> ClientAuthorizationTestCase.doOp(opCode, indices, opFlags, expectedResult));
-      }
-    }
-  }
-
-  private void createClient2NoException(final Properties javaProps, final String authInit,
-      final int port1, final int port2, final Properties getCredentials) {
-    client2.invoke(() -> ClientAuthenticationTestUtils.createCacheClient(authInit, getCredentials,
-        javaProps, port1, port2, 0, NO_EXCEPTION));
-  }
-
-  private void createClient1NoException(final Properties javaProps, final String authInit,
-      final int port1, final int port2, final Properties createCredentials) {
-    client1.invoke(() -> ClientAuthenticationTestUtils.createCacheClient(authInit,
-        createCredentials, javaProps, port1, port2, 0, NO_EXCEPTION));
-  }
-
-  private int createServer2(final Properties javaProps, final Properties serverProps) {
-    return server2.invoke(() -> ClientAuthorizationTestCase.createCacheServer(getLocatorPort(),
-        serverProps, javaProps));
-  }
-
-  private int createServer1(final Properties javaProps, final Properties serverProps) {
-    return server1.invoke(() -> ClientAuthorizationTestCase.createCacheServer(getLocatorPort(),
-        serverProps, javaProps));
-  }
-
-  private void createServer2(Properties javaProps, Properties serverProps, int port2) {
-    server2.invoke(() -> ClientAuthorizationTestCase.createCacheServer(getLocatorPort(), port2,
-        serverProps, javaProps));
-  }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationLegacyConfigurationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationLegacyConfigurationDUnitTest.java
new file mode 100644
index 0000000..c38893f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientAuthorizationLegacyConfigurationDUnitTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.security;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR;
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.security.templates.SimpleAccessController;
+import org.apache.geode.security.templates.SimpleAuthenticator;
+import org.apache.geode.security.templates.UserPasswordAuthInit;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.standalone.VersionManager;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+
+@Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class ClientAuthorizationLegacyConfigurationDUnitTest {
+
+  @Rule
+  public ClusterStartupRule csRule = new ClusterStartupRule();
+
+  private MemberVM locator;
+  private MemberVM server;
+  private static String regionName = "testRegion";
+
+  // Some data values against which we will test.
+  private static final String initKey = "server-placed-key";
+  private static final String initValue = "server-placed-value";
+  private static final String singleKey = "single-key";
+  private static final String singleValue = "single-value";
+  private static final String mapKey1 = "map-key1";
+  private static final String mapValue1 = "map-value1";
+  private static final String mapKey2 = "map-key2";
+  private static final String mapValue2 = "map-value2";
+  private static Map<String, String> keyValueMap = new HashMap<>();
+  static {
+    keyValueMap.put(mapKey1, mapValue1);
+    keyValueMap.put(mapKey2, mapValue2);
+  }
+
+  // Using a client of every version...
+  @Parameterized.Parameter
+  public String clientVersion;
+
+  @Parameterized.Parameters(name = "clientVersion={0}")
+  public static Collection<String> data() {
+    return VersionManager.getInstance().getVersions();
+  }
+
+  @Test
+  public void everythingFailsWithInvalidAuthenticator() throws Exception {
+    Properties clusterProps = new Properties();
+    clusterProps.setProperty(SECURITY_CLIENT_AUTHENTICATOR,
+        "org.apache.geode.no.such.authenticator.create");
+    clusterProps.setProperty(SECURITY_CLIENT_ACCESSOR,
+        SimpleAccessController.class.getName() + ".create");
+    clusterProps.setProperty(UserPasswordAuthInit.USER_NAME, "cluster,data");
+    clusterProps.setProperty(UserPasswordAuthInit.PASSWORD, "cluster,data");
+    clusterProps.setProperty(SECURITY_CLIENT_AUTH_INIT,
+        UserPasswordAuthInit.class.getCanonicalName() + ".create");
+
+    locator = csRule.startLocatorVM(0, clusterProps);
+    server = csRule.startServerVM(1, clusterProps, locator.getPort());
+    server.invoke(() -> {
+      Cache cache = ClusterStartupRule.getCache();
+      RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.PARTITION);
+      Region<String, String> region = rf.create(regionName);
+      region.put(initKey, initValue);
+    });
+
+    Properties clientProps = new Properties();
+    clusterProps.setProperty(SECURITY_CLIENT_AUTHENTICATOR,
+        "org.apache.geode.no.such.authenticator.create");
+    clusterProps.setProperty(SECURITY_CLIENT_ACCESSOR,
+        SimpleAccessController.class.getName() + ".create");
+    clientProps.setProperty(UserPasswordAuthInit.USER_NAME, "data");
+    clientProps.setProperty(UserPasswordAuthInit.PASSWORD, "data");
+    clientProps.setProperty(SECURITY_CLIENT_AUTH_INIT,
+        UserPasswordAuthInit.class.getCanonicalName() + ".create");
+
+    int locatorPort = locator.getPort();
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> cf
+        .addPoolLocator("localhost", locatorPort);
+
+    ClientVM client = csRule.startClientVM(2, clientProps, cacheSetup, clientVersion);
+    client.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<String, String> rf =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region<String, String> region = rf.create(regionName);
+
+      // Assert that everything is horrible
+      assertThatThrownBy(() -> region.get(initKey))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.get(singleKey, null))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.getAll(keyValueMap.keySet()))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.getAll(keyValueMap.keySet(), null))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.put(singleKey, singleValue))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.put(singleKey, singleValue, null))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.putAll(keyValueMap))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.putAll(keyValueMap, null))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+    });
+  }
+
+  @Test
+  public void everythingFailsWithInvalidAccessor() throws Exception {
+    Properties clusterProps = new Properties();
+    clusterProps.setProperty(SECURITY_CLIENT_AUTHENTICATOR,
+        SimpleAuthenticator.class.getCanonicalName() + ".create");
+    clusterProps.setProperty(SECURITY_CLIENT_ACCESSOR, "org.apache.geode.no.such.accessor.create");
+    // give cluster members super-user permissions for ease of testing / RMI invocation
+    clusterProps.setProperty(UserPasswordAuthInit.USER_NAME, "cluster,data");
+    clusterProps.setProperty(UserPasswordAuthInit.PASSWORD, "cluster,data");
+    clusterProps.setProperty(SECURITY_CLIENT_AUTH_INIT,
+        UserPasswordAuthInit.class.getCanonicalName() + ".create");
+
+    locator = csRule.startLocatorVM(0, clusterProps);
+    server = csRule.startServerVM(1, clusterProps, locator.getPort());
+    server.invoke(() -> {
+      Cache cache = ClusterStartupRule.getCache();
+      RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.PARTITION);
+      Region<String, String> region = rf.create(regionName);
+      region.put(initKey, initValue);
+    });
+
+    Properties clientProps = new Properties();
+    clientProps.setProperty(SECURITY_CLIENT_AUTHENTICATOR,
+        SimpleAuthenticator.class.getCanonicalName() + ".create");
+    clientProps.setProperty(SECURITY_CLIENT_ACCESSOR, "org.apache.geode.no.such.accessor.create");
+    // give cluster members super-user permissions for ease of testing / RMI invocation
+    clientProps.setProperty(UserPasswordAuthInit.USER_NAME, "data");
+    clientProps.setProperty(UserPasswordAuthInit.PASSWORD, "data");
+    clientProps.setProperty(SECURITY_CLIENT_AUTH_INIT,
+        UserPasswordAuthInit.class.getCanonicalName() + ".create");
+
+    int locatorPort = locator.getPort();
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> cf
+        .addPoolLocator("localhost", locatorPort);
+
+    ClientVM client = csRule.startClientVM(2, clientProps, cacheSetup, clientVersion);
+    client.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<String, String> rf =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region<String, String> region = rf.create(regionName);
+
+      // Assert that everything is horrible
+      assertThatThrownBy(() -> region.get(initKey))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.get(singleKey, null))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.getAll(keyValueMap.keySet()))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.getAll(keyValueMap.keySet(), null))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.put(singleKey, singleValue))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.put(singleKey, singleValue, null))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.putAll(keyValueMap))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+      assertThatThrownBy(() -> region.putAll(keyValueMap, null))
+          .hasCauseInstanceOf(AuthenticationFailedException.class);
+    });
+  }
+
+
+}
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientDataAuthorizationUsingLegacySecurityDUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientDataAuthorizationUsingLegacySecurityDUnitTest.java
new file mode 100644
index 0000000..a9ec691
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientDataAuthorizationUsingLegacySecurityDUnitTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.security;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR;
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.security.templates.SimpleAccessController;
+import org.apache.geode.security.templates.SimpleAuthenticator;
+import org.apache.geode.security.templates.UserPasswordAuthInit;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.standalone.VersionManager;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+
+/**
+ * Tests for authorization from client to server for data puts and gets. For similar test in the
+ * case of failover, see {@link ClientDataAuthorizationUsingLegacySecurityWithFailoverDUnitTest}.
+ *
+ * @since GemFire 5.5
+ */
+@Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class ClientDataAuthorizationUsingLegacySecurityDUnitTest {
+  @Rule
+  public ClusterStartupRule csRule = new ClusterStartupRule();
+
+  private MemberVM locator;
+  private MemberVM server;
+  private static String regionName = "testRegion";
+
+  // Some data values against which we will test.
+  private static final String initKey = "server-placed-key";
+  private static final String initValue = "server-placed-value";
+  private static final String singleKey = "single-key";
+  private static final String singleValue = "single-value";
+  private static final String mapKey1 = "map-key1";
+  private static final String mapValue1 = "map-value1";
+  private static final String mapKey2 = "map-key2";
+  private static final String mapValue2 = "map-value2";
+  private static Map<String, String> keyValueMap = new HashMap<>();
+  static {
+    keyValueMap.put(mapKey1, mapValue1);
+    keyValueMap.put(mapKey2, mapValue2);
+  }
+
+  // Using a client of every version...
+  @Parameterized.Parameter
+  public String clientVersion;
+
+  @Parameterized.Parameters(name = "clientVersion={0}")
+  public static Collection<String> data() {
+    return VersionManager.getInstance().getVersions();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    // We want the cluster VMs to be super-users for ease of testing / remote invocation.
+    Properties clusterMemberProperties = getVMPropertiesWithPermission("cluster,data");
+
+    locator = csRule.startLocatorVM(0, clusterMemberProperties);
+    server = csRule.startServerVM(1, clusterMemberProperties, locator.getPort());
+    server.invoke(() -> {
+      Cache cache = ClusterStartupRule.getCache();
+      RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.PARTITION);
+      Region<String, String> region = rf.create(regionName);
+      region.put(initKey, initValue);
+    });
+  }
+
+  @Test
+  public void dataWriteClientCanPut() throws Exception {
+    Properties props = getVMPropertiesWithPermission("dataWrite");
+    int locatorPort = locator.getPort();
+
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> cf
+        .addPoolLocator("localhost", locatorPort);
+    ClientVM clientVM = csRule.startClientVM(2, props, cacheSetup, clientVersion);
+
+    // Client adds data
+    clientVM.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<String, String> rf =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region<String, String> region = rf.create(regionName);
+
+      region.put(singleKey, singleValue);
+      region.putAll(keyValueMap);
+    });
+
+    // Confirm server data has been updated.
+    server.invoke(() -> {
+      assertThat(ClusterStartupRule.getCache().getRegion(regionName))
+          .containsOnlyKeys(initKey, singleKey, mapKey1, mapKey2).containsEntry(initKey, initValue)
+          .containsEntry(singleKey, singleValue).containsEntry(mapKey1, mapValue1)
+          .containsEntry(mapKey2, mapValue2);
+    });
+  }
+
+  @Test
+  public void dataWriteCannotGet() throws Exception {
+    Properties props = getVMPropertiesWithPermission("dataWrite");
+    int locatorPort = locator.getPort();
+
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> cf
+        .addPoolLocator("localhost", locatorPort);
+    ClientVM client = csRule.startClientVM(2, props, cacheSetup, clientVersion);
+
+    // Client cannot get through any avenue
+    client.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<String, String> rf =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region<String, String> region = rf.create(regionName);
+
+      assertThatThrownBy(() -> region.get(initKey))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.get(singleKey, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      // An unauthorized getAll does not throw; it just does not return the requested values.
+      // See GEODE-3632.
+      assertThat(region.getAll(keyValueMap.keySet())).isEmpty();
+      assertThat(region.getAll(keyValueMap.keySet(), null)).isEmpty();
+    });
+  }
+
+  @Test
+  public void dataReadClientCanGet() throws Exception {
+    Properties props = getVMPropertiesWithPermission("dataRead");
+    int locatorPort = locator.getPort();
+
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> cf
+        .addPoolLocator("localhost", locatorPort);
+    ClientVM client = csRule.startClientVM(2, props, cacheSetup, clientVersion);
+
+    // Add some values for the client to get
+    server.invoke(() -> {
+      Cache cache = ClusterStartupRule.getCache();
+      Region<String, String> region = cache.getRegion(regionName);
+      region.put(singleKey, singleValue);
+      region.put(mapKey1, mapValue1);
+      region.put(mapKey2, mapValue2);
+    });
+
+    // Client can successfully get the data
+    client.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<String, String> rf =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region<String, String> region = rf.create(regionName);
+
+      assertThat(region.get(initKey)).isEqualTo(initValue);
+      assertThat(region.get(singleKey)).isEqualTo(singleValue);
+      assertThat(region.getAll(keyValueMap.keySet())).containsAllEntriesOf(keyValueMap);
+    });
+  }
+
+  @Test
+  public void dataReadCannotPut() throws Exception {
+    Properties props = getVMPropertiesWithPermission("dataRead");
+    int locatorPort = locator.getPort();
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> cf
+        .addPoolLocator("localhost", locatorPort);
+    ClientVM clientVM = csRule.startClientVM(2, props, cacheSetup, clientVersion);
+
+    clientVM.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<String, String> rf =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region<String, String> region = rf.create(regionName);
+
+      assertThatThrownBy(() -> region.put(singleKey, singleValue))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.put(singleKey, singleValue, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.putAll(keyValueMap))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.putAll(keyValueMap, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+    });
+
+    // Confirm server-side that no put went through:
+    server.invoke(() -> {
+      assertThat(ClusterStartupRule.getCache().getRegion(regionName)).containsOnlyKeys(initKey)
+          .containsEntry(initKey, initValue);
+    });
+
+  }
+
+  private Properties getVMPropertiesWithPermission(String permission) {
+    Properties props = new Properties();
+    // Using the legacy security framework
+    props.setProperty(SECURITY_CLIENT_AUTHENTICATOR,
+        SimpleAuthenticator.class.getCanonicalName() + ".create");
+    props.setProperty(SECURITY_CLIENT_ACCESSOR,
+        SimpleAccessController.class.getCanonicalName() + ".create");
+
+    // Using the given username/perission string
+    props.setProperty(UserPasswordAuthInit.USER_NAME, permission);
+    props.setProperty(UserPasswordAuthInit.PASSWORD, permission);
+    props.setProperty(SECURITY_CLIENT_AUTH_INIT,
+        UserPasswordAuthInit.class.getCanonicalName() + ".create");
+    return props;
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/security/ClientDataAuthorizationUsingLegacySecurityWithFailoverDUnitTest.java b/geode-core/src/test/java/org/apache/geode/security/ClientDataAuthorizationUsingLegacySecurityWithFailoverDUnitTest.java
new file mode 100644
index 0000000..42e1391
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/security/ClientDataAuthorizationUsingLegacySecurityWithFailoverDUnitTest.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.security;
+
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_ACCESSOR;
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTHENTICATOR;
+import static org.apache.geode.distributed.ConfigurationProperties.SECURITY_CLIENT_AUTH_INIT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Consumer;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.util.CacheListenerAdapter;
+import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.security.templates.SimpleAccessController;
+import org.apache.geode.security.templates.SimpleAuthenticator;
+import org.apache.geode.security.templates.UserPasswordAuthInit;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.standalone.VersionManager;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.categories.SecurityTest;
+import org.apache.geode.test.junit.rules.VMProvider;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+
+/**
+ * This test class reproduces the tests present in
+ * {@link ClientDataAuthorizationUsingLegacySecurityDUnitTest} and confirms that permissions are
+ * maintained over failover.
+ */
+@Category({DistributedTest.class, SecurityTest.class})
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class ClientDataAuthorizationUsingLegacySecurityWithFailoverDUnitTest {
+  @Rule
+  public ClusterStartupRule csRule = new ClusterStartupRule();
+
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private static String regionName = "testRegion";
+
+  // Some data values against which we will test.
+  private static final String server_k1 = "server-key-1";
+  private static final String server_v1 = "server-value-1";
+  private static final String server_k2 = "server-key-2";
+  private static final String server_v2 = "server-value-2";
+  private static Map<String, String> serverData = new HashMap<>();
+
+  static {
+    serverData.put(server_k1, server_v1);
+    serverData.put(server_k2, server_v2);
+  }
+
+  // Some data values against which we will test.
+  private static final String client_k1 = "client-key-1";
+  private static final String client_v1 = "client-value-1";
+  private static final String client_k2 = "client-key-2";
+  private static final String client_v2 = "client-value-2";
+  private static final String client_k3 = "client-key-3";
+  private static final String client_v3 = "client-value-3";
+
+  private static final String client_k4 = "client-key-4";
+  private static final String client_v4 = "client-value-4";
+  private static final String client_k5 = "client-key-5";
+  private static final String client_v5 = "client-value-5";
+
+  private static final String client_k6 = "client-key-6";
+  private static final String client_v6 = "client-value-6";
+  private static final String client_k7 = "client-key-7";
+  private static final String client_v7 = "client-value-7";
+  private static Map<String, String> clientData45 = new HashMap<>();
+
+  static {
+    clientData45.put(client_k4, client_v4);
+    clientData45.put(client_k5, client_v5);
+  }
+
+  private static Map<String, String> clientData67 = new HashMap<>();
+
+  static {
+    clientData67.put(client_k6, client_v6);
+    clientData67.put(client_k7, client_v7);
+  }
+
+  // Test against every client version
+  @Parameterized.Parameter
+  public String clientVersion;
+
+  @Parameterized.Parameters(name = "clientVersion={0}")
+  public static Collection<String> data() {
+    return VersionManager.getInstance().getVersions();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    Properties clusterMemberProperties = getVMPropertiesWithPermission("cluster,data");
+
+    locator = csRule.startLocatorVM(0, clusterMemberProperties);
+    server1 = csRule.startServerVM(1, clusterMemberProperties, locator.getPort());
+    server2 = csRule.startServerVM(2, clusterMemberProperties, locator.getPort());
+
+    // put some data on the cluster.
+    server1.invoke(() -> {
+      Cache cache = ClusterStartupRule.getCache();
+      RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+      rf.addCacheListener(new ClientAuthorizationFailoverTestListener());
+      Region<String, String> region = rf.create(regionName);
+      region.putAll(serverData);
+    });
+
+    server2.invoke(() -> {
+      Cache cache = ClusterStartupRule.getCache();
+      RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+      Region<String, String> region = rf.create(regionName);
+      assertThat(region.getAll(serverData.keySet())).containsAllEntriesOf(serverData);
+    });
+  }
+
+  @Test
+  public void dataReaderCanStillOnlyReadAfterFailover() throws Exception {
+    // Connect to the server that will fail
+    ClientVM client = createAndInitializeClientAndCache("dataRead");
+
+    // Client should be able to read and not write.
+    client.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getClientCache().getRegion(regionName);
+      // Assert that the client can get
+      assertThat(region.get(server_k1)).isEqualTo(server_v1);
+      assertThat(region.get(server_k2, null)).isEqualTo(server_v2);
+      assertThat(region.getAll(serverData.keySet())).containsAllEntriesOf(serverData);
+      assertThat(region.getAll(serverData.keySet(), null)).containsAllEntriesOf(serverData);
+      // Assert that the client cannot put
+      assertThatThrownBy(() -> region.put(client_k1, client_v1))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.put(client_k2, client_v2, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.putIfAbsent(client_k3, client_v3))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.putAll(clientData45))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.putAll(clientData67, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+    });
+
+    // Initialize client cache and region. Get the port of the primary connected server.
+    VMProvider server_to_fail = determinePrimaryServer(client);
+
+    // Bring down primary server
+    server_to_fail.invoke(() -> ClusterStartupRule.getCache().close());
+
+    // Confirm failover
+    VMProvider secondaryServer = (server1.equals(server_to_fail)) ? server2 : server1;
+    assertThat(secondaryServer).isEqualTo(determinePrimaryServer(client));
+
+    // Confirm permissions: client should still only be able to read and not write.
+    client.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getClientCache().getRegion(regionName);
+      // Assert that the client can get
+      assertThat(region.get(server_k1)).isEqualTo(server_v1);
+      assertThat(region.get(server_k2, null)).isEqualTo(server_v2);
+      assertThat(region.getAll(serverData.keySet())).containsAllEntriesOf(serverData);
+      assertThat(region.getAll(serverData.keySet(), null)).containsAllEntriesOf(serverData);
+      // Assert that the client cannot put
+      assertThatThrownBy(() -> region.put(client_k1, client_v1))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.put(client_k2, client_v2, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.putIfAbsent(client_k3, client_v3))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.putAll(clientData45))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.putAll(clientData67, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+    });
+
+    // Confirm that no puts went through
+    secondaryServer.invoke(() -> {
+      assertThat(ClusterStartupRule.getCache().getRegion(regionName))
+          .containsOnlyKeys(server_k1, server_k2).containsAllEntriesOf(serverData);
+    });
+  }
+
+  @Test
+  public void dataWriterCanStillOnlyWriteAfterFailover() throws Exception {
+    // Connect to the server that will fail
+    ClientVM client = createAndInitializeClientAndCache("dataWrite");
+
+    // Client should be able to write but not read.
+    client.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getClientCache().getRegion(regionName);
+      // Puts do not throw
+      // Assert that the client can put
+      region.put(client_k1, client_v1);
+      region.put(client_k2, client_v2, null);
+      region.putIfAbsent(client_k3, client_v3);
+      region.putAll(clientData45);
+      region.putAll(clientData67, null);
+      // Assert that the client cannot get
+      assertThatThrownBy(() -> region.get(server_k1))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.get(server_k2, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      // An unauthorized getAll does not throw; it just does not return the requested values.
+      // See GEODE-3632.
+      assertThat(region.getAll(serverData.keySet())).isEmpty();
+      assertThat(region.getAll(serverData.keySet(), null)).isEmpty();
+    });
+
+    // Initialize client cache and region. Get the port of the primary connected server.
+    VMProvider server_to_fail = determinePrimaryServer(client);
+
+    // Bring down primary server
+    server_to_fail.invoke(() -> ClusterStartupRule.getCache().close());
+
+    // Confirm failover
+    VMProvider secondaryServer = (server1.equals(server_to_fail)) ? server2 : server1;
+    assertThat(secondaryServer).isEqualTo(determinePrimaryServer(client));
+
+    // Confirm permissions: client should still only be able to write and not read.
+    client.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getClientCache().getRegion(regionName);
+      // Puts do not throw
+      // Assert that the client can put
+      region.put(client_k1, client_v1);
+      region.put(client_k2, client_v2, null);
+      region.putIfAbsent(client_k3, client_v3);
+      region.putAll(clientData45);
+      region.putAll(clientData67, null);
+      // Assert that the client cannot get
+      assertThatThrownBy(() -> region.get(server_k1))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.get(server_k2, null))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      // An unauthorized getAll does not throw; it just does not return the requested values.
+      // See GEODE-3632.
+      assertThat(region.getAll(serverData.keySet())).isEmpty();
+      assertThat(region.getAll(serverData.keySet(), null)).isEmpty();
+    });
+  }
+
+  @Test
+  public void dataReaderCanRegisterAndUnregisterAcrossFailover() throws Exception {
+    // Connect to the server that will fail
+    ClientVM client = createAndInitializeClientAndCache("dataRead");
+
+    // Client should be able to register and unregister interests.
+    client.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getClientCache().getRegion(regionName);
+      region.unregisterInterest(client_k1);
+      region.registerInterest(client_k1);
+      region.registerInterestRegex("client-.*");
+      region.unregisterInterestRegex("client-.*");
+    });
+
+    // Initialize client cache and region. Get the port of the primary connected server.
+    VMProvider server_to_fail = determinePrimaryServer(client);
+
+    // Bring down primary server
+    server_to_fail.invoke(() -> ClusterStartupRule.getCache().close());
+
+    // Confirm failover
+    VMProvider secondaryServer = (server1.equals(server_to_fail)) ? server2 : server1;
+    assertThat(secondaryServer).isEqualTo(determinePrimaryServer(client));
+
+    // Confirm permissions.
+    client.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getClientCache().getRegion(regionName);
+      region.unregisterInterest(client_k1);
+      region.registerInterest(client_k1);
+      region.registerInterestRegex("client-.*");
+      region.unregisterInterestRegex("client-.*");
+    });
+  }
+
+  @Test
+  public void dataWriterCannotRegisterInterestAcrossFailover() throws Exception {
+    Properties props = getVMPropertiesWithPermission("dataWrite");
+    int server1Port = this.server1.getPort();
+    int server2Port = this.server2.getPort();
+
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> cf
+        .addPoolServer("localhost", server1Port).addPoolServer("localhost", server2Port)
+        .setPoolSubscriptionEnabled(true).setPoolSubscriptionRedundancy(2);
+    ClientVM client1 = csRule.startClientVM(3, props, cacheSetup, clientVersion);
+
+    // Initialize cache
+    client1.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<String, String> rf =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region<String, String> region1 = rf.create(regionName);
+    });
+
+    ClientVM client = client1;
+
+    // Client should be able to register and unregister interests.
+    client.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getClientCache().getRegion(regionName);
+      assertThatThrownBy(() -> region.registerInterest(client_k1))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.registerInterestRegex("client-.*"))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      // Attempts to unregister will fail client-side. The client maintains its own lists of
+      // interests and, since the above failed, any unregistering of interest will prematurely
+      // terminate before contacting any server. No authorization is attempted.
+    });
+
+    // Initialize client cache and region. Get the port of the primary connected server.
+    VMProvider server_to_fail = determinePrimaryServer(client);
+
+    // Bring down primary server
+    server_to_fail.invoke(() -> ClusterStartupRule.getCache().close());
+
+    // Confirm failover
+    VMProvider secondaryServer = (server1.equals(server_to_fail)) ? server2 : server1;
+    assertThat(secondaryServer).isEqualTo(determinePrimaryServer(client));
+
+    // Confirm permissions.
+    client.invoke(() -> {
+      Region<String, String> region = ClusterStartupRule.getClientCache().getRegion(regionName);
+      assertThatThrownBy(() -> region.registerInterest(client_k1))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+      assertThatThrownBy(() -> region.registerInterestRegex("client-.*"))
+          .hasCauseInstanceOf(NotAuthorizedException.class);
+    });
+  }
+
+  private ClientVM createAndInitializeClientAndCache(String withPermission) throws Exception {
+    Properties props = getVMPropertiesWithPermission(withPermission);
+    int server1Port = this.server1.getPort();
+    int server2Port = this.server2.getPort();
+
+    Consumer<ClientCacheFactory> cacheSetup = (Serializable & Consumer<ClientCacheFactory>) cf -> cf
+        .addPoolServer("localhost", server1Port).addPoolServer("localhost", server2Port)
+        .setPoolSubscriptionEnabled(true).setPoolSubscriptionRedundancy(2);
+    ClientVM client = csRule.startClientVM(3, props, cacheSetup, clientVersion);
+
+    // Initialize cache
+    client.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      ClientRegionFactory<String, String> rf =
+          cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
+      Region<String, String> region = rf.create(regionName);
+    });
+
+    return client;
+  }
+
+  private VMProvider determinePrimaryServer(ClientVM client) {
+    int primaryPort = client.invoke(() -> {
+      ClientCache cache = ClusterStartupRule.getClientCache();
+      PoolImpl pool = (PoolImpl) cache.getDefaultPool();
+      return pool.getPrimaryPort();
+    });
+
+    return (primaryPort == server1.getPort()) ? server1 : server2;
+  }
+
+  private Properties getVMPropertiesWithPermission(String permission) {
+    Properties props = new Properties();
+    // Using the legacy security framework
+    props.setProperty(SECURITY_CLIENT_AUTHENTICATOR,
+        SimpleAuthenticator.class.getCanonicalName() + ".create");
+    props.setProperty(SECURITY_CLIENT_ACCESSOR,
+        SimpleAccessController.class.getCanonicalName() + ".create");
+
+    // Using the given username/permission string
+    props.setProperty(UserPasswordAuthInit.USER_NAME, permission);
+    props.setProperty(UserPasswordAuthInit.PASSWORD, permission);
+    props.setProperty(SECURITY_CLIENT_AUTH_INIT,
+        UserPasswordAuthInit.class.getCanonicalName() + ".create");
+    return props;
+  }
+
+  /** A trivial listener */
+  private static class ClientAuthorizationFailoverTestListener
+      extends CacheListenerAdapter<String, String> {
+    private static final Logger logger = LogService.getLogger();
+
+    @Override
+    public void afterCreate(EntryEvent<String, String> event) {
+      logger.info("In afterCreate");
+    }
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/security/templates/SimpleAccessController.java b/geode-core/src/test/java/org/apache/geode/security/templates/SimpleAccessController.java
new file mode 100644
index 0000000..2ebf4d8
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/security/templates/SimpleAccessController.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.security.templates;
+
+import java.security.Principal;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.operations.OperationContext;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.security.AccessControl;
+import org.apache.geode.security.NotAuthorizedException;
+
+/**
+ * A test implementation of the legacy {@link org.apache.geode.security.AccessControl}, mirroring
+ * the structure of {@link org.apache.geode.examples.SimpleSecurityManager}. An authenticated user's
+ * permissions are defined by the username itself, e.g., user "dataRead" has permissions DATA:READ
+ * and user "data,cluster" has permissions DATA and CLUSTER.
+ */
+public class SimpleAccessController implements AccessControl {
+  private static Principal principal;
+
+  @Override
+  public void init(Principal principal, DistributedMember remoteMember, Cache cache)
+      throws NotAuthorizedException {
+    SimpleAccessController.principal = principal;
+  }
+
+  public static AccessControl create() {
+    return new SimpleAccessController();
+  }
+
+  @Override
+  public boolean authorizeOperation(String regionName, OperationContext context) {
+    switch (context.getOperationCode()) {
+      case GET:
+      case REGISTER_INTEREST:
+      case UNREGISTER_INTEREST:
+      case CONTAINS_KEY:
+      case KEY_SET:
+      case QUERY:
+      case EXECUTE_CQ:
+      case STOP_CQ:
+      case CLOSE_CQ:
+        return authorize(principal, "DATA:READ");
+      case PUT:
+      case PUTALL:
+      case REMOVEALL:
+      case DESTROY:
+      case INVALIDATE:
+        return authorize(principal, "DATA:WRITE");
+      case REGION_CLEAR:
+      case REGION_CREATE:
+      case REGION_DESTROY:
+      case EXECUTE_FUNCTION:
+      case GET_DURABLE_CQS:
+        return false;
+      default:
+        return false;
+    }
+  }
+
+  private boolean authorize(Principal principal, String permission) {
+    String[] principals = principal.toString().toLowerCase().split(",");
+    for (String role : principals) {
+      String permissionString = permission.replace(":", "").toLowerCase();
+      if (permissionString.startsWith(role))
+        return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void close() {
+    principal = null;
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/security/templates/SimpleAuthenticator.java b/geode-core/src/test/java/org/apache/geode/security/templates/SimpleAuthenticator.java
new file mode 100644
index 0000000..326883b
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/security/templates/SimpleAuthenticator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.security.templates;
+
+import java.security.Principal;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+
+import org.apache.geode.LogWriter;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.security.Authenticator;
+
+/**
+ * A test implementation of the legacy {@link org.apache.geode.security.Authenticator}, mirroring
+ * the structure of {@link org.apache.geode.examples.SimpleSecurityManager}. Authenticates a user
+ * when the username matches the password, which in turn will match the user's permissions in
+ * {@link org.apache.geode.security.templates.SimpleAccessController}.
+ */
+public class SimpleAuthenticator implements Authenticator {
+  @Override
+  public void init(Properties securityProps, LogWriter systemLogger, LogWriter securityLogger)
+      throws AuthenticationFailedException {}
+
+  public static Authenticator create() {
+    return new SimpleAuthenticator();
+  }
+
+  @Override
+  public Principal authenticate(Properties props, DistributedMember member)
+      throws AuthenticationFailedException {
+    // Expect "security-username" and "security-password" to (a) match and (b) define permissions.
+    String username = props.getProperty("security-username");
+    String password = props.getProperty("security-password");
+    if (StringUtils.isNotBlank(username) && !username.equals(password)) {
+      throw new AuthenticationFailedException(
+          "SimpleAuthenticator expects username to match password.");
+    }
+    return new UsernamePrincipal(username);
+  }
+
+  @Override
+  public void close() {}
+}

-- 
To stop receiving notification emails like this one, please contact
prhomberg@apache.org.