You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2017/11/07 01:33:30 UTC
[geode] branch develop updated: GEODE-3637: Revert changes to
client queue initialization
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f53bdf0 GEODE-3637: Revert changes to client queue initialization
f53bdf0 is described below
commit f53bdf0306c9fb522d6aa64a855a0415b158b8d9
Author: kohlmu-pivotal <uk...@pivotal.io>
AuthorDate: Mon Nov 6 16:26:42 2017 -0800
GEODE-3637: Revert changes to client queue initialization
---
.../internal/cache/tier/sockets/AcceptorImpl.java | 312 +++++++++------------
.../cache/tier/sockets/ServerConnection.java | 51 ++--
.../sockets/AcceptorImplClientQueueDUnitTest.java | 263 -----------------
.../apache/geode/test/dunit/rules/CacheRule.java | 22 +-
4 files changed, 157 insertions(+), 491 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 21a0ac3..5b289a9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -103,7 +103,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
private static final Logger logger = LogService.getLogger();
private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit");
- private static final int HANDSHAKER_DEFAULT_POOL_SIZE = 4;
protected final CacheServerStats stats;
private final int maxConnections;
@@ -116,11 +115,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
private final ThreadPoolExecutor hsPool;
/**
- * A pool used to process client-queue-initializations.
- */
- private final ThreadPoolExecutor clientQueueInitPool;
-
- /**
* The port on which this acceptor listens for client connections
*/
private final int localPort;
@@ -540,126 +534,103 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
this.clientNotifier.getStats());
- pool = initializeServerConnectionThreadPool();
- hsPool = initializeHandshakerThreadPool();
- clientQueueInitPool = initializeClientQueueInitializerThreadPool();
-
- isAuthenticationRequired = this.securityService.isClientSecurityRequired();
-
- isIntegratedSecurity = this.securityService.isIntegratedSecurity();
-
- String postAuthzFactoryName =
- this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
-
- isPostAuthzCallbackPresent =
- (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
- }
-
- private ThreadPoolExecutor initializeHandshakerThreadPool() throws IOException {
- String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
- final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
- ThreadFactory socketThreadFactory = new ThreadFactory() {
- AtomicInteger connNum = new AtomicInteger(-1);
-
- @Override
- public Thread newThread(Runnable command) {
- String threadName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
- getStats().incAcceptThreadsCreated();
- return new Thread(socketThreadGroup, command, threadName);
- }
- };
- try {
- final BlockingQueue blockingQueue = new SynchronousQueue();
- final RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
- public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
- try {
- blockingQueue.put(r);
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt(); // preserve the state
- throw new RejectedExecutionException(
- LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
+ {
+ ThreadPoolExecutor tmp_pool = null;
+ String gName = "ServerConnection "
+ // + serverSock.getInetAddress()
+ + "on port " + this.localPort;
+ final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
+
+ ThreadFactory socketThreadFactory = new ThreadFactory() {
+ int connNum = -1;
+
+ public Thread newThread(final Runnable command) {
+ int tnum;
+ synchronized (this) {
+ tnum = ++connNum;
}
+ String tName = socketThreadGroup.getName() + " Thread " + tnum;
+ getStats().incConnectionThreadsCreated();
+ Runnable r = new Runnable() {
+ public void run() {
+ try {
+ command.run();
+ } catch (CancelException e) { // bug 39463
+ // ignore
+ } finally {
+ ConnectionTable.releaseThreadsSockets();
+ }
+ }
+ };
+ return new Thread(socketThreadGroup, r, tName);
}
};
- logger.warn("Handshaker max Pool size: " + HANDSHAKE_POOL_SIZE);
- return new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, blockingQueue,
- socketThreadFactory, rejectedExecutionHandler);
- } catch (IllegalArgumentException poolInitException) {
- this.stats.close();
- this.serverSock.close();
- this.pool.shutdownNow();
- throw poolInitException;
+ try {
+ if (isSelector()) {
+ tmp_pool = new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
+ getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
+ } else {
+ tmp_pool = new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
+ TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+ }
+ } catch (IllegalArgumentException poolInitException) {
+ this.stats.close();
+ this.serverSock.close();
+ throw poolInitException;
+ }
+ this.pool = tmp_pool;
}
- }
-
- private ThreadPoolExecutor initializeClientQueueInitializerThreadPool() throws IOException {
- final ThreadGroup clientQueueThreadGroup =
- LoggingThreadGroup.createThreadGroup("Client Queue Initialization ", logger);
+ {
+ ThreadPoolExecutor tmp_hsPool = null;
+ String gName = "Handshaker " + serverSock.getInetAddress() + ":" + this.localPort;
+ final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
- ThreadFactory clientQueueThreadFactory = new ThreadFactory() {
- AtomicInteger connNum = new AtomicInteger(-1);
+ ThreadFactory socketThreadFactory = new ThreadFactory() {
+ int connNum = -1;
- @Override
- public Thread newThread(final Runnable command) {
- String threadName =
- clientQueueThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
- Runnable runnable = new Runnable() {
- public void run() {
- try {
- command.run();
- } catch (CancelException e) {
- logger.debug("Client Queue Initialization was canceled.", e);
- }
+ public Thread newThread(Runnable command) {
+ int tnum;
+ synchronized (this) {
+ tnum = ++connNum;
}
- };
- return new Thread(clientQueueThreadGroup, runnable, threadName);
- }
- };
- return new PooledExecutorWithDMStats(new SynchronousQueue(), 16, getStats().getCnxPoolHelper(),
- clientQueueThreadFactory, 60000);
- }
-
- private ThreadPoolExecutor initializeServerConnectionThreadPool() throws IOException {
- String gName = "ServerConnection "
- // + serverSock.getInetAddress()
- + "on port " + this.localPort;
- final ThreadGroup socketThreadGroup = LoggingThreadGroup.createThreadGroup(gName, logger);
-
- ThreadFactory socketThreadFactory = new ThreadFactory() {
- AtomicInteger connNum = new AtomicInteger(-1);
-
- @Override
- public Thread newThread(final Runnable command) {
- String tName = socketThreadGroup.getName() + " Thread " + connNum.incrementAndGet();
- getStats().incConnectionThreadsCreated();
- Runnable r = new Runnable() {
- public void run() {
+ String tName = socketThreadGroup.getName() + " Thread " + tnum;
+ getStats().incAcceptThreadsCreated();
+ return new Thread(socketThreadGroup, command, tName);
+ }
+ };
+ try {
+ final BlockingQueue bq = new SynchronousQueue();
+ final RejectedExecutionHandler reh = new RejectedExecutionHandler() {
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor pool) {
try {
- command.run();
- } catch (CancelException e) { // bug 39463
- // ignore
- } finally {
- ConnectionTable.releaseThreadsSockets();
+ bq.put(r);
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt(); // preserve the state
+ throw new RejectedExecutionException(
+ LocalizedStrings.AcceptorImpl_INTERRUPTED.toLocalizedString(), ex);
}
}
};
- return new Thread(socketThreadGroup, r, tName);
- }
- };
- try {
- if (isSelector()) {
- return new PooledExecutorWithDMStats(new LinkedBlockingQueue(), this.maxThreads,
- getStats().getCnxPoolHelper(), socketThreadFactory, Integer.MAX_VALUE);
- } else {
- return new ThreadPoolExecutor(MINIMUM_MAX_CONNECTIONS, this.maxConnections, 0L,
- TimeUnit.MILLISECONDS, new SynchronousQueue(), socketThreadFactory);
+ tmp_hsPool = new ThreadPoolExecutor(1, HANDSHAKE_POOL_SIZE, 60, TimeUnit.SECONDS, bq,
+ socketThreadFactory, reh);
+ } catch (IllegalArgumentException poolInitException) {
+ this.stats.close();
+ this.serverSock.close();
+ this.pool.shutdownNow();
+ throw poolInitException;
}
- } catch (IllegalArgumentException poolInitException) {
- this.stats.close();
- this.serverSock.close();
- throw poolInitException;
+ this.hsPool = tmp_hsPool;
}
+
+ isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+
+ isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+
+ String postAuthzFactoryName =
+ this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+
+ isPostAuthzCallbackPresent =
+ (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) ? true : false;
}
public long getAcceptorId() {
@@ -695,8 +666,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
@Deprecated
private static final int DEPRECATED_SELECTOR_POOL_SIZE =
Integer.getInteger("BridgeServer.SELECTOR_POOL_SIZE", 16).intValue();
- private static final int HANDSHAKE_POOL_SIZE = Integer
- .getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", HANDSHAKER_DEFAULT_POOL_SIZE).intValue();
+ private static final int HANDSHAKE_POOL_SIZE =
+ Integer.getInteger("BridgeServer.HANDSHAKE_POOL_SIZE", 4).intValue();
@Override
public void start() throws IOException {
@@ -831,9 +802,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
* @see SystemFailure#loadEmergencyClasses()
*/
public static void loadEmergencyClasses() {
- if (emergencyClassesLoaded) {
+ if (emergencyClassesLoaded)
return;
- }
emergencyClassesLoaded = true;
CachedRegionHelper.loadEmergencyClasses();
ServerConnection.loadEmergencyClasses();
@@ -900,9 +870,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
private Selector tmpSel;
private void checkForStuckKeys() {
- if (!WORKAROUND_SELECTOR_BUG) {
+ if (!WORKAROUND_SELECTOR_BUG)
return;
- }
if (tmpSel == null) {
try {
tmpSel = Selector.open();
@@ -918,9 +887,8 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
while (it.hasNext()) {
SelectionKey sk = (SelectionKey) it.next();
ServerConnection sc = (ServerConnection) sk.attachment();
- if (sc == null) {
+ if (sc == null)
continue;
- }
try {
sk.cancel();
this.selector.selectNow(); // clear the cancelled key
@@ -1072,7 +1040,40 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
break;
}
if (events == 0) {
+ // zeroEventsCount++;
+ // if (zeroEventsCount > 0) {
+ // zeroEventsCount = 0;
checkForStuckKeys();
+
+ // try {
+ // this.selector.close(); // this selector is sick!
+ // } catch (IOException ignore) {
+ // }
+ // this.selector = Selector.open();
+ // {
+ // Iterator it = selectorRegistrations.iterator();
+ // while (it.hasNext()) {
+ // ServerConnection sc = (ServerConnection)it.next();
+ // sc.registerWithSelector2(this.selector);
+ // }
+ // }
+ // }
+ // ArrayList al = new ArrayList();
+ // Iterator keysIt = this.selector.keys().iterator();
+ // while (keysIt.hasNext()) {
+ // SelectionKey sk = (SelectionKey)keysIt.next();
+ // al.add(sk.attachment());
+ // sk.cancel();
+ // }
+ // events = this.selector.selectNow();
+ // Iterator alIt = al.iterator();
+ // while (alIt.hasNext()) {
+ // ServerConnection sc = (ServerConnection)alIt.next();
+ // sc.registerWithSelector2(this.selector);
+ // }
+ // events = this.selector.select();
+ // } else {
+ // zeroEventsCount = 0;
}
while (events > 0) {
int cancelCount = 0;
@@ -1129,11 +1130,16 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
logger.warn(
LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED, rejected));
}
+ // } else if (key.isValid() && key.isConnectable()) {
+ // logger.info("DEBUG isConnectable and isValid key=" + key);
+ // finishCon(sc);
} else {
finishCon(sc);
if (key.isValid()) {
logger.warn(LocalizedMessage.create(
LocalizedStrings.AcceptorImpl_IGNORING_EVENT_ON_SELECTOR_KEY__0, key));
+ // } else {
+ // logger.info("DEBUG !isValid key=" + key);
}
}
} catch (CancelledKeyException ex) { // fix for bug 37739
@@ -1399,10 +1405,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
return this.clientServerCnxCount.get();
}
- public boolean isNotifyBySubscription() {
- return notifyBySubscription;
- }
-
protected void handleNewClientConnection(final Socket socket,
final ServerConnectionFactory serverConnectionFactory) throws IOException {
// Read the first byte. If this socket is being used for 'client to server'
@@ -1425,9 +1427,12 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
throw new EOFException();
}
- // GEODE-3637 - If the communicationMode is client Subscriptions, hand-off the client queue
- // initialization to be done in another threadPool
- if (initializeClientPools(socket, communicationMode)) {
+ if (communicationMode.isSubscriptionFeed()) {
+ boolean primary = communicationMode == CommunicationMode.PrimaryServerToClient;
+ logger.debug(":Bridge server: Initializing {} server-to-client communication socket: {}",
+ primary ? "primary" : "secondary", socket);
+ AcceptorImpl.this.clientNotifier.registerClient(socket, primary, this.acceptorId,
+ this.notifyBySubscription);
return;
}
@@ -1493,17 +1498,6 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
}
}
- private boolean initializeClientPools(Socket socket, CommunicationMode communicationMode) {
- if (communicationMode.isSubscriptionFeed()) {
- boolean isPrimaryServerToClient =
- communicationMode == CommunicationMode.PrimaryServerToClient;
- clientQueueInitPool
- .execute(new ClientQueueInitializerTask(socket, isPrimaryServerToClient, this));
- return true;
- }
- return false;
- }
-
private CommunicationMode getCommunicationModeForNonSelector(Socket socket) throws IOException {
socket.setSoTimeout(this.acceptTimeout);
this.socketCreator.configureServerSSLSocket(socket);
@@ -1668,6 +1662,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
* then calculate it.
* @return the ip address or host name this acceptor will listen on. An "" if all local addresses
* will be listened to.
+ *
* @since GemFire 5.7
*/
private static String calcBindHostName(Cache cache, String bindName) {
@@ -1796,6 +1791,7 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
* This method returns a thread safe structure which can be iterated over without worrying about
* ConcurrentModificationException. JMX MBeans/Commands need to iterate over this list to get
* client info.
+ *
*/
public ServerConnection[] getAllServerConnectionList() {
return this.allSCList;
@@ -1819,42 +1815,4 @@ public class AcceptorImpl implements Acceptor, Runnable, CommBufferPool {
releaseCommBuffer(Message.setTLCommBuffer(null));
}
-
- private class ClientQueueInitializerTask implements Runnable {
- private final Socket socket;
- private final boolean isPrimaryServerToClient;
- private final AcceptorImpl acceptor;
-
- public ClientQueueInitializerTask(Socket socket, boolean isPrimaryServerToClient,
- AcceptorImpl acceptor) {
- this.socket = socket;
- this.acceptor = acceptor;
- this.isPrimaryServerToClient = isPrimaryServerToClient;
- }
-
- @Override
- public void run() {
- logger.info(":Bridge server: Initializing {} server-to-client communication socket: {}",
- isPrimaryServerToClient ? "primary" : "secondary", socket);
- try {
- acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient,
- acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
- } catch (IOException ex) {
- closeSocket(socket);
- if (isRunning()) {
- if (!acceptor.loggedAcceptError) {
- acceptor.loggedAcceptError = true;
- if (ex instanceof SocketTimeoutException) {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION_DUE_TO_SOCKET_TIMEOUT));
- } else {
- logger.warn(LocalizedMessage.create(
- LocalizedStrings.AcceptorImpl_CACHE_SERVER_FAILED_ACCEPTING_CLIENT_CONNECTION__0,
- ex), ex);
- }
- }
- }
- }
- }
- }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 74451e5..0e510af 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -189,9 +189,7 @@ public abstract class ServerConnection implements Runnable {
*/
private volatile int requestSpecificTimeout = -1;
- /**
- * Tracks the id of the most recent batch to which a reply has been sent
- */
+ /** Tracks the id of the most recent batch to which a reply has been sent */
private int latestBatchIdReplied = -1;
/*
@@ -713,9 +711,8 @@ public abstract class ServerConnection implements Runnable {
// can be used.
initializeCommands();
// its initialized in verifyClientConnection call
- if (!getCommunicationMode().isWAN()) {
+ if (!getCommunicationMode().isWAN())
initializeClientUserAuths();
- }
}
if (TEST_VERSION_AFTER_HANDSHAKE_FLAG) {
Assert.assertTrue((this.handshake.getVersion().ordinal() == testVersionAfterHandshake),
@@ -895,9 +892,7 @@ public abstract class ServerConnection implements Runnable {
}
}
if (unregisterClient)// last serverconnection call all close on auth objects
- {
cleanClientAuths();
- }
this.clientUserAuths = null;
if (needsUnregister) {
this.acceptor.getClientHealthMonitor().removeConnection(this.proxyId, this);
@@ -922,9 +917,8 @@ public abstract class ServerConnection implements Runnable {
ClientUserAuths cua = new ClientUserAuths(proxyId.hashCode());
ClientUserAuths retCua = proxyIdVsClientUserAuths.putIfAbsent(proxyId, cua);
- if (retCua == null) {
+ if (retCua == null)
return cua;
- }
return retCua;
}
@@ -960,9 +954,8 @@ public abstract class ServerConnection implements Runnable {
boolean removed = this.clientUserAuths.removeSubject(aIds.getUniqueId());
// if not successfull, try the old way
- if (!removed) {
+ if (!removed)
removed = this.clientUserAuths.removeUserId(aIds.getUniqueId(), keepalive);
- }
return removed;
} catch (NullPointerException npe) {
@@ -1017,6 +1010,7 @@ public abstract class ServerConnection implements Runnable {
throw new AuthenticationFailedException("Authentication failed");
}
+
byte[] credBytes = msg.getPart(0).getSerializedForm();
credBytes = ((HandShake) this.handshake).decryptBytes(credBytes);
@@ -1130,7 +1124,6 @@ public abstract class ServerConnection implements Runnable {
public void run() {
setOwner();
-
if (getAcceptor().isSelector()) {
boolean finishedMsg = false;
try {
@@ -1143,7 +1136,9 @@ public abstract class ServerConnection implements Runnable {
finishedMsg = true;
}
}
- } catch (java.nio.channels.ClosedChannelException | CancelException ignore) {
+ } catch (java.nio.channels.ClosedChannelException ignore) {
+ // ok shutting down
+ } catch (CancelException e) {
// ok shutting down
} catch (IOException ex) {
logger.warn(
@@ -1192,7 +1187,6 @@ public abstract class ServerConnection implements Runnable {
* If registered with a selector then this will be the key we are registered with.
*/
// private SelectionKey sKey = null;
-
/**
* Register this connection with the given selector for read events. Note that switch the channel
* to non-blocking so it can be in a selector.
@@ -1208,8 +1202,7 @@ public abstract class ServerConnection implements Runnable {
}
public void registerWithSelector2(Selector s) throws IOException {
- /* this.sKey = */
- getSelectableChannel().register(s, SelectionKey.OP_READ, this);
+ /* this.sKey = */getSelectableChannel().register(s, SelectionKey.OP_READ, this);
}
/**
@@ -1232,6 +1225,7 @@ public abstract class ServerConnection implements Runnable {
}
/**
+ *
* @return String representing the DistributedSystemMembership of the Client VM
*/
public String getMembershipID() {
@@ -1271,11 +1265,10 @@ public abstract class ServerConnection implements Runnable {
}
protected int getClientReadTimeout() {
- if (this.requestSpecificTimeout == -1) {
+ if (this.requestSpecificTimeout == -1)
return this.handshake.getClientReadTimeout();
- } else {
+ else
return this.requestSpecificTimeout;
- }
}
protected boolean isProcessingMessage() {
@@ -1526,9 +1519,7 @@ public abstract class ServerConnection implements Runnable {
return this.name;
}
- /**
- * returns the name of this connection
- */
+ /** returns the name of this connection */
public String getName() {
return this.name;
}
@@ -1745,13 +1736,11 @@ public abstract class ServerConnection implements Runnable {
// for backward client it will be store in member variable userAuthId
// for other look "requestMsg" here and get unique-id from this to get the authzrequest
- if (!AcceptorImpl.isAuthenticationRequired()) {
+ if (!AcceptorImpl.isAuthenticationRequired())
return null;
- }
- if (AcceptorImpl.isIntegratedSecurity()) {
+ if (AcceptorImpl.isIntegratedSecurity())
return null;
- }
long uniqueId = getUniqueId();
@@ -1779,13 +1768,11 @@ public abstract class ServerConnection implements Runnable {
public AuthorizeRequestPP getPostAuthzRequest()
throws AuthenticationRequiredException, IOException {
- if (!AcceptorImpl.isAuthenticationRequired()) {
+ if (!AcceptorImpl.isAuthenticationRequired())
return null;
- }
- if (AcceptorImpl.isIntegratedSecurity()) {
+ if (AcceptorImpl.isIntegratedSecurity())
return null;
- }
// look client version and return authzrequest
// for backward client it will be store in member variable userAuthId
@@ -1812,9 +1799,7 @@ public abstract class ServerConnection implements Runnable {
return postAuthReq;
}
- /**
- * returns the member ID byte array to be used for creating EventID objects
- */
+ /** returns the member ID byte array to be used for creating EventID objects */
public byte[] getEventMemberIDByteArray() {
return this.memberIdByteArray;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
deleted file mode 100644
index c0b2d07..0000000
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplClientQueueDUnitTest.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.internal.cache.tier.sockets;
-
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.rmi.RemoteException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.awaitility.Awaitility;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.geode.cache.EntryEvent;
-import org.apache.geode.cache.InterestPolicy;
-import org.apache.geode.cache.InterestResultPolicy;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.SubscriptionAttributes;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.cache.server.ClientSubscriptionConfig;
-import org.apache.geode.cache.util.CacheListenerAdapter;
-import org.apache.geode.distributed.DistributedLockBlackboard;
-import org.apache.geode.distributed.DistributedLockBlackboardImpl;
-import org.apache.geode.internal.cache.DiskStoreAttributes;
-import org.apache.geode.internal.cache.InitialImageOperation;
-import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.test.dunit.AsyncInvocation;
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.rules.CacheRule;
-import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
-import org.apache.geode.test.dunit.rules.DistributedTestRule;
-import org.apache.geode.test.dunit.rules.SharedCountersRule;
-import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
-import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
-
-@Category(DistributedTest.class)
-public class AcceptorImplClientQueueDUnitTest implements Serializable {
- private final Host host = Host.getHost(0);
- private static final int numberOfEntries = 200;
- private static final AtomicInteger eventCount = new AtomicInteger(0);
- private static final AtomicBoolean completedClient2 = new AtomicBoolean(false);
-
- @ClassRule
- public static DistributedTestRule distributedTestRule = new DistributedTestRule();
-
- @Rule
- public CacheRule cacheRule =
- CacheRule.builder().createCacheIn(host.getVM(0)).createCacheIn(host.getVM(1))
- .addSystemProperty("BridgeServer.HANDSHAKE_POOL_SIZE", "1").build();
-
- @Rule
- public SerializableTestName name = new SerializableTestName();
-
- @Rule
- public SerializableTemporaryFolder tempDir = new SerializableTemporaryFolder();
-
- @Rule
- public DistributedRestoreSystemProperties restoreSystemProperties =
- new DistributedRestoreSystemProperties();
-
- private DistributedLockBlackboard blackboard = null;
-
- @Before
- public void setup() throws Exception {
- blackboard = DistributedLockBlackboardImpl.getInstance();
- }
-
- @After
- public void tearDown() throws RemoteException {
- blackboard.initCount();
- host.getAllVMs().forEach((vm) -> vm.invoke(() -> {
- InitialImageOperation.slowImageProcessing = 0;
- System.getProperties().remove("BridgeServer.HANDSHAKE_POOL_SIZE");
- }));
- }
-
- @Test
- public void testClientSubscriptionQueueBlockingConnectionInitialization() throws Exception {
- VM vm0 = host.getVM(0);
- VM vm1 = host.getVM(1);
- VM vm2 = host.getVM(2);
- VM vm3 = host.getVM(3);
- int vm0_port = vm0.invoke("Start server with subscription turned on", () -> {
- try {
- return createSubscriptionServer(cacheRule.getCache());
- } catch (IOException e) {
- return 0;
- }
- });
-
- vm2.invoke("Start Client1 with durable interest registration turned on", () -> {
- ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
- clientCacheFactory.setPoolSubscriptionEnabled(true);
- clientCacheFactory.setPoolSubscriptionRedundancy(1);
- clientCacheFactory.setPoolReadTimeout(200);
- clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
- ClientCache cache = clientCacheFactory.set("durable-client-id", "1")
- .set("durable-client-timeout", "300").set("mcast-port", "0").create();
- ClientRegionFactory<Object, Object> clientRegionFactory =
- cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
- Region region = clientRegionFactory.create("subscriptionRegion");
-
- region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
- cache.readyForEvents();
- cache.close(true);
- });
- vm3.invoke("Start Client2 to add entries to region", () -> {
- ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
- clientCacheFactory.addPoolServer(host.getHostName(), vm0_port);
- ClientCache cache = clientCacheFactory.set("mcast-port", "0").create();
- ClientRegionFactory<Object, Object> clientRegionFactory =
- cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
- Region region = clientRegionFactory.create("subscriptionRegion");
-
- for (int i = 0; i < numberOfEntries; i++) {
- region.put(i, i);
- }
- cache.close();
- });
-
- int vm1_port = vm1.invoke("Start server2 in with subscriptions turned on", () -> {
- try {
- int serverPort = createSubscriptionServer(cacheRule.getCache());
- InitialImageOperation.slowImageProcessing = 30;
- return serverPort;
- } catch (IOException e) {
- return 0;
- }
- });
-
- vm0.invoke("Turn on slow image processsing", () -> {
- InitialImageOperation.slowImageProcessing = 30;
- });
-
- AsyncInvocation<Boolean> completedClient1 =
- vm2.invokeAsync("Start Client1, expecting durable messages to be delivered", () -> {
-
- ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
- clientCacheFactory.setPoolSubscriptionEnabled(true);
- clientCacheFactory.setPoolSubscriptionRedundancy(1);
- clientCacheFactory.setPoolMinConnections(1);
- clientCacheFactory.setPoolMaxConnections(1);
- clientCacheFactory.setPoolReadTimeout(200);
- clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
- ClientCacheFactory cacheFactory = clientCacheFactory.set("durable-client-id", "1")
- .set("durable-client-timeout", "300").set("mcast-port", "0");
- blackboard.incCount();
- ClientCache cache = cacheFactory.create();
-
- ClientRegionFactory<Object, Object> clientRegionFactory =
- cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
- Region region = clientRegionFactory.addCacheListener(new CacheListenerAdapter() {
- @Override
- public void afterCreate(EntryEvent event) {
- eventCount.incrementAndGet();
- }
-
- @Override
- public void afterUpdate(EntryEvent event) {
- eventCount.incrementAndGet();
- }
- }).create("subscriptionRegion");
-
- region.registerInterestRegex(".*", InterestResultPolicy.NONE, true);
- cache.readyForEvents();
- Awaitility.await().atMost(10, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS)
- .until(() -> eventCount.get() == numberOfEntries);
- cache.close();
- return eventCount.get() == numberOfEntries;
- });
-
- vm3.invokeAsync("Start Client2 to add entries to region", () -> {
- while (true) {
- Thread.sleep(100);
- if (blackboard.getCount() == 1) {
- break;
- }
- }
- ClientCache cache = null;
- ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
- clientCacheFactory.setPoolRetryAttempts(0);
- clientCacheFactory.setPoolMinConnections(1);
- clientCacheFactory.setPoolMaxConnections(1);
- clientCacheFactory.setPoolReadTimeout(200);
- clientCacheFactory.setPoolSocketConnectTimeout(500);
- clientCacheFactory.addPoolServer(host.getHostName(), vm1_port);
- cache = clientCacheFactory.set("mcast-port", "0").create();
- ClientRegionFactory<Object, Object> clientRegionFactory =
- cache.createClientRegionFactory(ClientRegionShortcut.PROXY);
- Region region = clientRegionFactory.create("subscriptionRegion");
-
- int returnValue = 0;
- for (int i = 0; i < 100; i++) {
- returnValue = (int) region.get(i);
- }
- cache.close();
- completedClient2.set(returnValue == 99);
- });
- assertTrue(completedClient1.get());
- assertTrue(vm3.invoke(() -> completedClient2.get()));
- }
-
- private int createSubscriptionServer(InternalCache cache) throws IOException {
- initializeDiskStore(cache);
- initializeReplicateRegion(cache);
- return initializeCacheServerWithSubscription(host, cache);
- }
-
- private void initializeDiskStore(InternalCache cache) throws IOException {
- DiskStoreAttributes diskStoreAttributes = new DiskStoreAttributes();
- diskStoreAttributes.name = "clientQueueDS";
- diskStoreAttributes.diskDirs = new File[] {tempDir.newFolder(name + "_dir")};
- cache.createDiskStoreFactory(diskStoreAttributes).create("clientQueueDS");
- }
-
- private void initializeReplicateRegion(InternalCache cache) {
- cache.createRegionFactory(RegionShortcut.REPLICATE).setStatisticsEnabled(true)
- .setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL))
- .create("subscriptionRegion");
- }
-
- private int initializeCacheServerWithSubscription(Host host, InternalCache cache)
- throws IOException {
- CacheServer cacheServer1 = cache.addCacheServer(false);
- ClientSubscriptionConfig clientSubscriptionConfig = cacheServer1.getClientSubscriptionConfig();
- clientSubscriptionConfig.setEvictionPolicy("entry");
- clientSubscriptionConfig.setCapacity(5);
- clientSubscriptionConfig.setDiskStoreName("clientQueueDS");
- cacheServer1.setPort(0);
- cacheServer1.setHostnameForClients(host.getHostName());
- cacheServer1.start();
- return cacheServer1.getPort();
- }
-}
diff --git a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
index b65bf86..dc42da8 100644
--- a/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
+++ b/geode-core/src/test/java/org/apache/geode/test/dunit/rules/CacheRule.java
@@ -63,7 +63,6 @@ public class CacheRule extends DistributedExternalResource {
private final boolean disconnectAfter;
private final List<VM> createCacheInVMs;
private final Properties config;
- private final Properties systemProperties;
public static Builder builder() {
return new Builder();
@@ -75,19 +74,18 @@ public class CacheRule extends DistributedExternalResource {
this.disconnectAfter = builder.disconnectAfter;
this.createCacheInVMs = builder.createCacheInVMs;
this.config = builder.config;
- this.systemProperties = builder.systemProperties;
}
@Override
protected void before() {
if (createCacheInAll) {
- invoker().invokeInEveryVMAndController(() -> createCache(config, systemProperties));
+ invoker().invokeInEveryVMAndController(() -> createCache(config));
} else {
if (createCache) {
- createCache(config, systemProperties);
+ createCache(config);
}
for (VM vm : createCacheInVMs) {
- vm.invoke(() -> createCache(config, systemProperties));
+ vm.invoke(() -> createCache(config));
}
}
}
@@ -110,8 +108,7 @@ public class CacheRule extends DistributedExternalResource {
return cache.getInternalDistributedSystem();
}
- private static void createCache(final Properties config, final Properties systemProperties) {
- System.getProperties().putAll(systemProperties);
+ private static void createCache(final Properties config) {
cache = (InternalCache) new CacheFactory(config).create();
}
@@ -144,7 +141,6 @@ public class CacheRule extends DistributedExternalResource {
private boolean disconnectAfter;
private List<VM> createCacheInVMs = new ArrayList<>();
private Properties config = new Properties();
- private Properties systemProperties = new Properties();
public Builder() {
config.setProperty(LOCATORS, getLocators());
@@ -199,16 +195,6 @@ public class CacheRule extends DistributedExternalResource {
return this;
}
- public Builder addSystemProperty(final String key, final String value) {
- this.systemProperties.put(key, value);
- return this;
- }
-
- public Builder addSystemProperties(final Properties config) {
- this.systemProperties.putAll(config);
- return this;
- }
-
public CacheRule build() {
return new CacheRule(this);
}
--
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <co...@geode.apache.org>'].