You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/13 17:18:42 UTC
[21/22] geode git commit: Create ClientCachePutBench
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 3b0c2ff..379b65b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -43,7 +43,6 @@ import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.ClientSession;
import org.apache.geode.cache.DynamicRegionFactory;
@@ -111,7 +110,6 @@ import org.apache.shiro.util.ThreadState;
* It queues messages to be sent from the server to the client. It then reads those messages from
* the queue and sends them to the client.
*
- *
* @since GemFire 4.2
*/
@SuppressWarnings("synthetic-access")
@@ -119,155 +117,127 @@ public class CacheClientProxy implements ClientSession {
private static final Logger logger = LogService.getLogger();
/**
- * The socket between the server and the client
- */
- protected Socket _socket;
-
- private final AtomicBoolean _socketClosed = new AtomicBoolean();
-
- /**
- * A communication buffer used by each message we send to the client
+ * Notify the region when a client interest registration occurs. This tells the region to update
+ * access time when an update is to be pushed to a client. It is enabled only for
+ * <code>PartitionedRegion</code>s currently.
*/
- protected ByteBuffer _commBuffer;
+ private static final boolean NOTIFY_REGION_ON_INTEREST =
+ Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");
/**
- * The remote host's IP address string (cached for convenience)
+ * The number of times to peek on shutdown before giving up and shutting down
*/
- protected String _remoteHostAddress;
+ private static final int MAXIMUM_SHUTDOWN_PEEKS =
+ Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50);
/**
- * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
+ * Default value for slow starting time of dispatcher
*/
- protected volatile boolean isMarkedForRemoval = false;
+ private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
/**
- * @see #isMarkedForRemoval
+ * Key in the system property from which the slow starting time value will be retrieved
*/
- protected final Object isMarkedForRemovalLock = new Object();
+ private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
/**
- * The proxy id of the client represented by this proxy
+ * TODO: delete this and rewrite the tests that use this NOTE: this is NOT thread safe
*/
- protected ClientProxyMembershipID proxyID;
+ private static TestHook testHook;
/**
- * The GemFire cache
+ * TODO: delete this and rewrite the test that uses this A debug flag used for testing Backward
+ * compatibility
*/
- protected final GemFireCacheImpl _cache;
+ private static boolean afterMessageCreationForTesting = false;
/**
- * The list of keys that the client represented by this proxy is interested in (stored by region)
+ * TODO: delete this and rewrite the test that uses this for testing purposes, delays the start of
+ * the dispatcher thread
*/
- protected final ClientInterestList[] cils = new ClientInterestList[2];
+ private static boolean isSlowStartForTesting = false;
- /**
- * A thread that dispatches messages to the client
- */
- protected volatile MessageDispatcher _messageDispatcher;
+ private final AtomicBoolean socketClosed = new AtomicBoolean();
/**
- * The statistics for this proxy
+ * @see #isMarkedForRemoval
*/
- protected final CacheClientProxyStats _statistics;
-
- protected final AtomicReference _durableExpirationTask = new AtomicReference();
-
- protected SystemTimer durableTimer;
+ private final Object isMarkedForRemovalLock = new Object();
/**
- * Whether this dispatcher is paused
+ * The GemFire cache
*/
- protected volatile boolean _isPaused = true;
+ private final GemFireCacheImpl cache;
/**
- * True if we are connected to a client.
- */
- private volatile boolean connected = false;
- // /**
- // * A string representing interest in all keys
- // */
- // protected static final String ALL_KEYS = "ALL_KEYS";
- //
- /**
- * True if a marker message is still in the ha queue.
+ * The list of keys that the client represented by this proxy is interested in (stored by region)
*/
- private boolean markerEnqueued = false;
+ private final ClientInterestList[] cils = new ClientInterestList[2];
/**
- * The number of times to peek on shutdown before giving up and shutting down
+ * The statistics for this proxy
*/
- protected static final int MAXIMUM_SHUTDOWN_PEEKS = Integer
- .getInteger(DistributionConfig.GEMFIRE_PREFIX + "MAXIMUM_SHUTDOWN_PEEKS", 50).intValue();
+ private final CacheClientProxyStats _statistics;
- /**
- * The number of milliseconds to wait for an offering to the message queue
- */
- protected static final int MESSAGE_OFFER_TIME = 0;
-
- /**
- * The default maximum message queue size
- */
- // protected static final int MESSAGE_QUEUE_SIZE_DEFAULT = 230000;
+ private final AtomicReference _durableExpirationTask = new AtomicReference();
/** The message queue size */
- protected final int _maximumMessageCount;
+ private final int _maximumMessageCount;
/**
* The time (in seconds ) after which a message in the client queue will expire.
*/
- protected final int _messageTimeToLive;
+ private final int _messageTimeToLive;
/**
* The <code>CacheClientNotifier</code> registering this proxy.
*/
- protected final CacheClientNotifier _cacheClientNotifier;
+ private final CacheClientNotifier cacheClientNotifier;
- /**
- * Defaults to true; meaning do some logging of dropped client notification messages. Set the
- * system property to true to cause dropped messages to NOT be logged.
- */
- protected static final boolean LOG_DROPPED_MSGS =
- !Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "disableNotificationWarnings");
+ private final Object clientUserAuthsLock = new Object();
/**
- * for testing purposes, delays the start of the dispatcher thread
+ * The AcceptorImpl identifier to which the proxy is connected.
*/
- public static boolean isSlowStartForTesting = false;
+ private final long _acceptorId;
- /**
- * Default value for slow starting time of dispatcher
- */
- private static final long DEFAULT_SLOW_STARTING_TIME = 5000;
+ /** acceptor's setting for notifyBySubscription */
+ private final boolean notifyBySubscription;
+
+ private final Object queuedEventsSync = new Object();
/**
- * Key in the system property from which the slow starting time value will be retrieved
+ * A counter that keeps track of how many task iterations that have occurred since the last ping
+ * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
+ * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
*/
- private static final String KEY_SLOW_START_TIME_FOR_TESTING = "slowStartTimeForTesting";
+ private final AtomicInteger pingCounter = new AtomicInteger();
- private boolean isPrimary;
+ private final Object drainLock = new Object();
- /** @since GemFire 5.7 */
- protected byte clientConflation = HandShake.CONFLATION_DEFAULT;
+ private final Object drainsInProgressLock = new Object();
+
+ private final SecurityService securityService;
/**
- * Flag to indicate whether to keep a durable client's queue alive
+ * Concurrency: protected by synchronization of {@link #isMarkedForRemovalLock}
*/
- boolean keepalive = false;
-
- private AccessControl postAuthzCallback;
- private Subject subject;
+ private volatile boolean isMarkedForRemoval = false;
/**
- * For multiuser environment..
+ * A thread that dispatches messages to the client
*/
- private ClientUserAuths clientUserAuths;
+ private volatile MessageDispatcher _messageDispatcher;
- private final Object clientUserAuthsLock = new Object();
+ /**
+ * Whether this dispatcher is paused
+ */
+ private volatile boolean _isPaused = true;
/**
- * The version of the client
+ * True if we are connected to a client.
*/
- private Version clientVersion;
+ private volatile boolean connected = false;
/**
* A map of region name as key and integer as its value. Basically, it stores the names of the
@@ -278,42 +248,60 @@ public class CacheClientProxy implements ClientSession {
*/
private volatile Map regionsWithEmptyDataPolicy = new HashMap();
+ /** To queue the events arriving during message dispatcher initialization */
+ private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
+ new ConcurrentLinkedQueue<Conflatable>();
+
+ private volatile boolean messageDispatcherInit = false;
+
/**
- * A debug flag used for testing Backward compatibility
+ * The socket between the server and the client
*/
- public static boolean AFTER_MESSAGE_CREATION_FLAG = false;
+ private Socket socket;
/**
- * Notify the region when a client interest registration occurs. This tells the region to update
- * access time when an update is to be pushed to a client. It is enabled only for
- * <code>PartitionedRegion</code>s currently.
+ * A communication buffer used by each message we send to the client
*/
- protected static final boolean NOTIFY_REGION_ON_INTEREST =
- Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest");
+ private ByteBuffer _commBuffer;
/**
- * The AcceptorImpl identifier to which the proxy is connected.
+ * The remote host's IP address string (cached for convenience)
*/
- private final long _acceptorId;
+ private String _remoteHostAddress;
- /** acceptor's setting for notifyBySubscription */
- private final boolean notifyBySubscription;
+ /**
+ * The proxy id of the client represented by this proxy
+ */
+ private ClientProxyMembershipID proxyID;
- /** To queue the events arriving during message dispatcher initialization */
- private volatile ConcurrentLinkedQueue<Conflatable> queuedEvents =
- new ConcurrentLinkedQueue<Conflatable>();
+ /**
+ * True if a marker message is still in the ha queue.
+ */
+ private boolean markerEnqueued = false;
- private final Object queuedEventsSync = new Object();
+ private boolean isPrimary;
- private volatile boolean messageDispatcherInit = false;
+ /** @since GemFire 5.7 */
+ private byte clientConflation = HandShake.CONFLATION_DEFAULT;
/**
- * A counter that keeps track of how many task iterations that have occurred since the last ping
- * or message. The {@linkplain CacheClientNotifier#scheduleClientPingTask ping task} increments
- * it. Normal messages sent to the client reset it. If the counter reaches 3, a ping is sent.
+ * Flag to indicate whether to keep a durable client's queue alive
*/
- private final AtomicInteger pingCounter = new AtomicInteger();
+ private boolean keepalive = false;
+
+ private AccessControl postAuthzCallback;
+ private Subject subject;
+
+ /**
+ * For multiuser environment..
+ */
+ private ClientUserAuths clientUserAuths;
+
+ /**
+ * The version of the client
+ */
+ private Version clientVersion;
/** Date on which this instances was created */
private Date creationDate;
@@ -321,52 +309,75 @@ public class CacheClientProxy implements ClientSession {
/**
* true when the durable client associated with this proxy is being restarted and prevents cqs
* from being closed and drained
- **/
+ */
private boolean drainLocked = false;
- private final Object drainLock = new Object();
/** number of cq drains that are currently in progress **/
private int numDrainsInProgress = 0;
- private final Object drainsInProgressLock = new Object();
- private SecurityService securityService = SecurityService.getSecurityService();
+ static CacheClientProxy createCacheClientProxy(final CacheClientNotifier ccn,
+ final GemFireCacheImpl cache, final StatisticsFactory statsFactory,
+ final SecurityService securityService, final Socket socket,
+ final ClientProxyMembershipID proxyID, final boolean isPrimary, final byte clientConflation,
+ final Version clientVersion, final long acceptorId, final boolean notifyBySubscription) {
+
+ CacheClientProxy cacheClientProxy =
+ new CacheClientProxy(ccn, cache, statsFactory, securityService, socket, proxyID, isPrimary,
+ clientConflation, clientVersion, acceptorId, notifyBySubscription);
+
+ // Create the interest list
+ cacheClientProxy.cils[RegisterInterestTracker.interestListIndex] =
+ new ClientInterestList(cacheClientProxy, cacheClientProxy.proxyID);
+ // Create the durable interest list
+ cacheClientProxy.cils[RegisterInterestTracker.durableInterestListIndex] =
+ new ClientInterestList(cacheClientProxy, cacheClientProxy.getDurableId());
+
+ return cacheClientProxy;
+ }
/**
* Constructor.
*
* @param ccn The <code>CacheClientNotifier</code> registering this proxy
+ * @param cache
* @param socket The socket between the server and the client
* @param proxyID representing the Connection Proxy of the clien
* @param isPrimary The boolean stating whether this prozxy is primary
- * @throws CacheException {
- */
- protected CacheClientProxy(CacheClientNotifier ccn, Socket socket,
- ClientProxyMembershipID proxyID, boolean isPrimary, byte clientConflation,
- Version clientVersion, long acceptorId, boolean notifyBySubscription) throws CacheException {
+ * @param clientConflation
+ * @param clientVersion
+ */
+ private CacheClientProxy(final CacheClientNotifier ccn, final GemFireCacheImpl cache,
+ final StatisticsFactory statsFactory, final SecurityService securityService,
+ final Socket socket, final ClientProxyMembershipID proxyID, final boolean isPrimary,
+ final byte clientConflation, final Version clientVersion, final long acceptorId,
+ final boolean notifyBySubscription) {
initializeTransientFields(socket, proxyID, isPrimary, clientConflation, clientVersion);
- this._cacheClientNotifier = ccn;
- this._cache = (GemFireCacheImpl) ccn.getCache();
+ this.cacheClientNotifier = ccn;
+ this.cache = cache;
+ this.securityService = securityService;
this._maximumMessageCount = ccn.getMaximumMessageCount();
this._messageTimeToLive = ccn.getMessageTimeToLive();
this._acceptorId = acceptorId;
this.notifyBySubscription = notifyBySubscription;
- StatisticsFactory factory = this._cache.getDistributedSystem();
+
this._statistics =
- new CacheClientProxyStats(factory, "id_" + this.proxyID.getDistributedMember().getId()
- + "_at_" + this._remoteHostAddress + ":" + this._socket.getPort());
+ new CacheClientProxyStats(statsFactory, "id_" + this.proxyID.getDistributedMember().getId()
+ + "_at_" + this._remoteHostAddress + ":" + this.socket.getPort());
- // Create the interest list
- this.cils[RegisterInterestTracker.interestListIndex] =
- new ClientInterestList(this, this.proxyID);
- // Create the durable interest list
- this.cils[RegisterInterestTracker.durableInterestListIndex] =
- new ClientInterestList(this, this.getDurableId());
this.postAuthzCallback = null;
- this._cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
+ this.cacheClientNotifier.getAcceptorStats().incCurrentQueueConnections();
this.creationDate = new Date();
initializeClientAuths();
}
+ boolean isClientConflationOn() {
+ return this.clientConflation == HandShake.CONFLATION_ON;
+ }
+
+ boolean isClientConflationDefault() {
+ return this.clientConflation == HandShake.CONFLATION_ON;
+ }
+
private void initializeClientAuths() {
if (AcceptorImpl.isPostAuthzCallbackPresent())
this.clientUserAuths = ServerConnection.getClientUserAuths(this.proxyID);
@@ -411,13 +422,13 @@ public class CacheClientProxy implements ClientSession {
private void initializeTransientFields(Socket socket, ClientProxyMembershipID pid, boolean ip,
byte cc, Version vers) {
- this._socket = socket;
+ this.socket = socket;
this.proxyID = pid;
this.connected = true;
{
int bufSize = 1024;
try {
- bufSize = _socket.getSendBufferSize();
+ bufSize = this.socket.getSendBufferSize();
if (bufSize < 1024) {
bufSize = 1024;
}
@@ -450,7 +461,6 @@ public class CacheClientProxy implements ClientSession {
return this.notifyBySubscription;
}
-
/**
* Returns the DistributedMember represented by this proxy
*/
@@ -458,47 +468,6 @@ public class CacheClientProxy implements ClientSession {
return this.proxyID;
}
- // the following code was commented out simply because it was not used
- // /**
- // * Determines if the proxy represents the client host (and only the host, not
- // * necessarily the exact VM running on the host)
- // *
- // * @return Whether the proxy represents the client host
- // */
- // protected boolean representsClientHost(String clientHost)
- // {
- // // [bruce] TODO BUGBUGBUG: this should compare InetAddresses, not Strings
- // return this._remoteHostAddress.equals(clientHost);
- // }
-
- // protected boolean representsClientVM(DistributedMember remoteMember)
- // {
- // // logger.warn("Is input port " + clientPort + " contained in " +
- // // logger.warn("Does input host " + clientHost + " equal " +
- // // this._remoteHostAddress+ ": " + representsClientHost(clientHost));
- // // logger.warn("representsClientVM: " +
- // // (representsClientHost(clientHost) && containsPort(clientPort)));
- // return (proxyID.getDistributedMember().equals(remoteMember));
- // }
-
- // /**
- // * Determines if the CacheClientUpdater proxied by this instance is listening
- // * on the input clientHost and clientPort
- // *
- // * @param clientHost
- // * The host name of the client to compare
- // * @param clientPort
- // * The port number of the client to compare
- // *
- // * @return Whether the CacheClientUpdater proxied by this instance is
- // * listening on the input clientHost and clientPort
- // */
- // protected boolean representsCacheClientUpdater(String clientHost,
- // int clientPort)
- // {
- // return (clientPort == this._socket.getPort() && representsClientHost(clientHost));
- // }
-
protected boolean isMember(ClientProxyMembershipID memberId) {
return this.proxyID.equals(memberId);
}
@@ -522,11 +491,11 @@ public class CacheClientProxy implements ClientSession {
* @return the socket between the server and the client
*/
protected Socket getSocket() {
- return this._socket;
+ return this.socket;
}
public String getSocketHost() {
- return this._socket.getInetAddress().getHostAddress();
+ return this.socket.getInetAddress().getHostAddress();
}
protected ByteBuffer getCommBuffer() {
@@ -548,7 +517,7 @@ public class CacheClientProxy implements ClientSession {
* @return the remote host's port
*/
public int getRemotePort() {
- return this._socket.getPort();
+ return this.socket.getPort();
}
/**
@@ -593,7 +562,7 @@ public class CacheClientProxy implements ClientSession {
this.isMarkedForRemovalLock.wait();
} catch (InterruptedException e) {
interrupted = true;
- this._cache.getCancelCriterion().checkCancelInProgress(e);
+ this.cache.getCancelCriterion().checkCancelInProgress(e);
}
} // while
} finally {
@@ -621,7 +590,7 @@ public class CacheClientProxy implements ClientSession {
* @return the GemFire cache
*/
public GemFireCacheImpl getCache() {
- return this._cache;
+ return this.cache;
}
public Set<String> getInterestRegisteredRegions() {
@@ -649,7 +618,7 @@ public class CacheClientProxy implements ClientSession {
* @return this proxy's <code>CacheClientNotifier</code>
*/
protected CacheClientNotifier getCacheClientNotifier() {
- return this._cacheClientNotifier;
+ return this.cacheClientNotifier;
}
/**
@@ -852,8 +821,8 @@ public class CacheClientProxy implements ClientSession {
}
}
} catch (Exception ex) {
- if (this._cache.getSecurityLoggerI18n().warningEnabled()) {
- this._cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
+ if (this.cache.getSecurityLoggerI18n().warningEnabled()) {
+ this.cache.getSecurityLoggerI18n().warning(LocalizedStrings.TWO_ARG_COLON,
new Object[] {this, ex});
}
}
@@ -991,9 +960,9 @@ public class CacheClientProxy implements ClientSession {
}
private void closeSocket() {
- if (this._socketClosed.compareAndSet(false, true)) {
+ if (this.socketClosed.compareAndSet(false, true)) {
// Close the socket
- this._cacheClientNotifier.getSocketCloser().asyncClose(this._socket, this._remoteHostAddress,
+ this.cacheClientNotifier.getSocketCloser().asyncClose(this.socket, this._remoteHostAddress,
null);
getCacheClientNotifier().getAcceptorStats().decCurrentQueueConnections();
}
@@ -1008,7 +977,7 @@ public class CacheClientProxy implements ClientSession {
{
String remoteHostAddress = this._remoteHostAddress;
if (remoteHostAddress != null) {
- this._cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
+ this.cacheClientNotifier.getSocketCloser().releaseResourcesForAddress(remoteHostAddress);
this._remoteHostAddress = null;
}
}
@@ -1124,7 +1093,7 @@ public class CacheClientProxy implements ClientSession {
InterestResultPolicy policy, boolean isDurable, boolean receiveValues, int interestType) {
// Create a client interest message for the keyOfInterest
ClientInterestMessageImpl message = new ClientInterestMessageImpl(
- new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
+ new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
policy.getOrdinal(), isDurable, !receiveValues, ClientInterestMessageImpl.REGISTER);
// Notify all secondary proxies of a change in interest
@@ -1146,7 +1115,7 @@ public class CacheClientProxy implements ClientSession {
String regionName, Object keyOfInterest) {
// Get the initial value
Get70 request = (Get70) Get70.getCommand();
- LocalRegion lr = (LocalRegion) this._cache.getRegion(regionName);
+ LocalRegion lr = (LocalRegion) this.cache.getRegion(regionName);
Get70.Entry entry = request.getValueAndIsObject(lr, keyOfInterest, null, null);
boolean isObject = entry.isObject;
byte[] value = null;
@@ -1170,7 +1139,7 @@ public class CacheClientProxy implements ClientSession {
EventID eventId = null;
if (clientInterestMessage == null) {
// If the clientInterestMessage is null, create a new event id
- eventId = new EventID(this._cache.getDistributedSystem());
+ eventId = new EventID(this.cache.getDistributedSystem());
} else {
// If the clientInterestMessage is not null, base the event id off its event id to fix
// GEM-794.
@@ -1239,7 +1208,7 @@ public class CacheClientProxy implements ClientSession {
boolean isDurable, boolean receiveValues, int interestType) {
// Notify all secondary proxies of a change in interest
ClientInterestMessageImpl message = new ClientInterestMessageImpl(
- new EventID(this._cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
+ new EventID(this.cache.getDistributedSystem()), regionName, keyOfInterest, interestType,
(byte) 0, isDurable, !receiveValues, ClientInterestMessageImpl.UNREGISTER);
notifySecondariesOfInterestChange(message);
@@ -1269,17 +1238,9 @@ public class CacheClientProxy implements ClientSession {
.append("->").append(InterestType.getString(message.getInterestType()));
logger.debug(buffer.toString());
}
- this._cacheClientNotifier.deliverInterestChange(this.proxyID, message);
+ this.cacheClientNotifier.deliverInterestChange(this.proxyID, message);
}
- /*
- * protected void addFilterRegisteredClients(String regionName, Object keyOfInterest) { try {
- * this._cacheClientNotifier.addFilterRegisteredClients(regionName, this.proxyID); } catch
- * (RegionDestroyedException e) {
- * logger.warn(LocalizedStrings.CacheClientProxy_0_INTEREST_REG_FOR_0_FAILED, regionName + "->" +
- * keyOfInterest, e); } }
- */
-
/**
* Registers interest in the input region name and key
*
@@ -1293,7 +1254,7 @@ public class CacheClientProxy implements ClientSession {
cil.registerClientInterest(regionName, keyOfInterest, interestType, sendUpdatesAsInvalidates);
if (flushState) {
flushForInterestRegistration(regionName,
- this._cache.getDistributedSystem().getDistributedMember());
+ this.cache.getDistributedSystem().getDistributedMember());
}
HARegionQueue queue = getHARegionQueue();
if (queue != null) { // queue is null during initialization
@@ -1306,7 +1267,7 @@ public class CacheClientProxy implements ClientSession {
* interest. During queue creation it is the queue's image provider.
*/
public void flushForInterestRegistration(String regionName, DistributedMember target) {
- Region r = this._cache.getRegion(regionName);
+ Region r = this.cache.getRegion(regionName);
if (r == null) {
if (logger.isDebugEnabled()) {
logger.debug("Unable to find region '{}' to flush for interest registration", regionName);
@@ -1320,7 +1281,7 @@ public class CacheClientProxy implements ClientSession {
if (r instanceof PartitionedRegion) {
// need to flush all buckets. SFO should be changed to target buckets
// belonging to a particular PR, but it doesn't have that option right now
- sfo = new StateFlushOperation(this._cache.getDistributedSystem().getDistributionManager());
+ sfo = new StateFlushOperation(this.cache.getDistributedSystem().getDistributionManager());
} else {
sfo = new StateFlushOperation((DistributedRegion) r);
}
@@ -1378,7 +1339,7 @@ public class CacheClientProxy implements ClientSession {
if (getHARegionQueue() != null) {
if (flushState) {
flushForInterestRegistration(regionName,
- this._cache.getDistributedSystem().getDistributedMember());
+ this.cache.getDistributedSystem().getDistributedMember());
}
getHARegionQueue().setHasRegisteredInterest(true);
}
@@ -1643,7 +1604,7 @@ public class CacheClientProxy implements ClientSession {
if (logger.isDebugEnabled()) {
logger.debug("About to send message directly to {}", this);
}
- if (this._messageDispatcher != null && this._socket != null && !this._socket.isClosed()) {
+ if (this._messageDispatcher != null && this.socket != null && !this.socket.isClosed()) {
// If the socket is open, send the message to it
this._messageDispatcher.sendMessageDirectly(message);
if (logger.isDebugEnabled()) {
@@ -1759,7 +1720,7 @@ public class CacheClientProxy implements ClientSession {
if (this.isPrimary) {
// Add the marker to the queue
if (!processedMarker) {
- EventID eventId = new EventID(this._cache.getDistributedSystem());
+ EventID eventId = new EventID(this.cache.getDistributedSystem());
this._messageDispatcher.enqueueMarker(new ClientMarkerMessageImpl(eventId));
}
@@ -1810,30 +1771,18 @@ public class CacheClientProxy implements ClientSession {
@Override
public String toString() {
StringBuffer buffer = new StringBuffer();
- buffer.append("CacheClientProxy[")
- // .append("client proxy id=")
- .append(this.proxyID)
- // .append("; client host name=")
- // .append(this._socket.getInetAddress().getCanonicalHostName())
- // .append("; client host address=")
- // .append(this._remoteHostAddress)
- .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
- .append("; version=").append(clientVersion).append("]");
+ buffer.append("CacheClientProxy[").append(this.proxyID).append("; port=")
+ .append(this.socket.getPort()).append("; primary=").append(isPrimary).append("; version=")
+ .append(clientVersion).append("]");
return buffer.toString();
}
public String getState() {
StringBuffer buffer = new StringBuffer();
- buffer.append("CacheClientProxy[")
- // .append("client proxy id=")
- .append(this.proxyID)
- // .append("; client host name=")
- // .append(this._socket.getInetAddress().getCanonicalHostName())
- // .append("; client host address=")
- // .append(this._remoteHostAddress)
- .append("; port=").append(this._socket.getPort()).append("; primary=").append(isPrimary)
- .append("; version=").append(clientVersion).append("; paused=").append(isPaused())
- .append("; alive=").append(isAlive()).append("; connected=").append(isConnected())
+ buffer.append("CacheClientProxy[").append(this.proxyID).append("; port=")
+ .append(this.socket.getPort()).append("; primary=").append(isPrimary).append("; version=")
+ .append(clientVersion).append("; paused=").append(isPaused()).append("; alive=")
+ .append(isAlive()).append("; connected=").append(isConnected())
.append("; isMarkedForRemoval=").append(isMarkedForRemoval).append("]");
if (_messageDispatcher != null && isAlive()) {
@@ -1844,15 +1793,7 @@ public class CacheClientProxy implements ClientSession {
}
public boolean isPrimary() {
- // boolean primary = this._messageDispatcher.isAlive()
- // || this._messageDispatcher._messageQueue.isPrimary();
- boolean primary = this.isPrimary;
- // System.out.println(this + ": DISPATCHER IS ALIVE: " + this._messageDispatcher.isAlive());
- // System.out.println(this + ": DISPATCHER QUEUE IS PRIMARY: " +
- // this._messageDispatcher._messageQueue.isPrimary());
- // System.out.println(this + ": IS PRIMARY: " + primary);
- return primary;
- // return this.isPrimary ;
+ return this.isPrimary;
}
protected boolean basicIsPrimary() {
@@ -1863,16 +1804,10 @@ public class CacheClientProxy implements ClientSession {
this.isPrimary = isPrimary;
}
- // private static int nextId = 0;
- // static protected int getNextId() {
- // synchronized (CacheClientProxy.class) {
- // return ++nextId;
- // }
- // }
- /*
+ /**
* Return this client's HA region queue
*
- * @returns - HARegionQueue of the client
+ * @return HARegionQueue of the client
*/
public HARegionQueue getHARegionQueue() {
if (this._messageDispatcher != null) {
@@ -1881,7 +1816,6 @@ public class CacheClientProxy implements ClientSession {
return null;
}
-
/**
* Reinitialize a durable <code>CacheClientProxy</code> with a new client.
*
@@ -1952,7 +1886,7 @@ public class CacheClientProxy implements ClientSession {
// Close the proxy
terminateDispatching(false);
- _cacheClientNotifier._statistics.incQueueDroppedCount();
+ cacheClientNotifier._statistics.incQueueDroppedCount();
/**
* Setting the expiration task to null again and cancelling existing one, if any. See
@@ -1978,7 +1912,7 @@ public class CacheClientProxy implements ClientSession {
};
if (this._durableExpirationTask.compareAndSet(null, task)) {
- _cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
+ cache.getCCPTimer().schedule(task, getDurableTimeout() * 1000L);
}
}
@@ -1996,11 +1930,131 @@ public class CacheClientProxy implements ClientSession {
}
}
+ public static void setTestHook(TestHook value) {
+ testHook = value;
+ }
+
+ public static void unsetTestHook() {
+ testHook = null;
+ }
+
+ public static TestHook getTestHook() {
+ return testHook;
+ }
+
+ public static void setSlowStartForTesting() {
+ isSlowStartForTesting = true;
+ }
+
+ static void unsetSlowStartForTesting() {
+ isSlowStartForTesting = false;
+ }
+
+ static void setAfterMessageCreationForTesting() {
+ afterMessageCreationForTesting = true;
+ }
+
+ static void unsetAfterMessageCreationForTesting() {
+ afterMessageCreationForTesting = false;
+ }
+
+ Socket getSocketForTesting() {
+ return this.socket;
+ }
+
+ ClientInterestList[] getClientInterestListForTesting() {
+ return this.cils;
+ }
+
+ MessageDispatcher getMessageDispatcherForTesting() {
+ return this._messageDispatcher;
+ }
+
+ /**
+ * Returns the current number of CQS the client installed.
+ *
+ * @return int the current count of CQs for this client
+ */
+ public int getCqCount() {
+ synchronized (this) {
+ return this._statistics.getCqCount();
+ }
+ }
+
+ /**
+ * Increment the number of CQs the client installed
+ *
+ */
+ public void incCqCount() {
+ synchronized (this) {
+ this._statistics.incCqCount();
+ }
+ }
+
+ /**
+ * Decrement the number of CQs the client installed
+ *
+ */
+ public synchronized void decCqCount() {
+ synchronized (this) {
+ this._statistics.decCqCount();
+ }
+ }
+
+ /**
+ * Returns true if the client has one CQ
+ *
+ * @return true if the client has one CQ
+ */
+ public boolean hasOneCq() {
+ synchronized (this) {
+ return this._statistics.getCqCount() == 1;
+ }
+ }
+
+ /**
+ * Returns true if the client has no CQs
+ *
+ * @return true if the client has no CQs
+ */
+ public boolean hasNoCq() {
+ synchronized (this) {
+ return this._statistics.getCqCount() == 0;
+ }
+ }
+
+ /**
+ * Get map of regions with empty data policy
+ *
+ * @since GemFire 6.1
+ */
+ public Map getRegionsWithEmptyDataPolicy() {
+ return regionsWithEmptyDataPolicy;
+ }
+
+ public int incrementAndGetPingCounter() {
+ int pingCount = this.pingCounter.incrementAndGet();
+ return pingCount;
+ }
+
+ public void resetPingCounter() {
+ this.pingCounter.set(0);
+ }
+
+ /**
+ * Returns the number of seconds that have elapsed since the Client proxy created.
+ *
+ * @since GemFire 7.0
+ */
+ public long getUpTime() {
+ return (System.currentTimeMillis() - this.creationDate.getTime()) / 1000;
+ }
+
/**
* Class <code>ClientInterestList</code> provides a convenient interface for manipulating client
* interest information.
*/
- static protected class ClientInterestList {
+ static class ClientInterestList {
final CacheClientProxy ccp;
@@ -2035,7 +2089,7 @@ public class CacheClientProxy implements ClientSession {
}
Set keysRegistered = null;
synchronized (this.interestListLock) {
- LocalRegion r = (LocalRegion) this.ccp._cache.getRegion(regionName, true);
+ LocalRegion r = (LocalRegion) this.ccp.cache.getRegion(regionName, true);
if (r == null) {
throw new RegionDestroyedException("Region could not be found for interest registration",
regionName);
@@ -2059,7 +2113,7 @@ public class CacheClientProxy implements ClientSession {
protected FilterProfile getProfile(String regionName) {
try {
- return this.ccp._cache.getFilterProfile(regionName);
+ return this.ccp.cache.getFilterProfile(regionName);
} catch (CancelException e) {
return null;
}
@@ -2225,7 +2279,6 @@ public class CacheClientProxy implements ClientSession {
}
}
-
/**
* Class <code>MessageDispatcher</code> is a <code>Thread</code> that processes messages bound for
* the client by taking messsages from the message queue and sending them to the client over the
@@ -2238,34 +2291,17 @@ public class CacheClientProxy implements ClientSession {
*/
protected final HARegionQueue _messageQueue;
- // /**
- // * An int used to keep track of the number of messages dropped for logging
- // * purposes. If greater than zero then a warning has been logged about
- // * messages being dropped.
- // */
- // private int _numberOfMessagesDropped = 0;
-
/**
* The proxy for which this dispatcher is processing messages
*/
private final CacheClientProxy _proxy;
- // /**
- // * The conflator faciliates message conflation
- // */
- // protected BridgeEventConflator _eventConflator;
-
/**
* Whether the dispatcher is stopped
*/
private volatile boolean _isStopped = true;
/**
- * guarded.By _pausedLock
- */
- // boolean _isPausedDispatcher = false;
-
- /**
* A lock object used to control pausing this dispatcher
*/
protected final Object _pausedLock = new Object();
@@ -2278,11 +2314,6 @@ public class CacheClientProxy implements ClientSession {
private final ReadWriteLock socketLock = new ReentrantReadWriteLock();
private final Lock socketWriteLock = socketLock.writeLock();
- // /**
- // * A boolean verifying whether a warning has already been issued if the
- // * message queue has reached its capacity.
- // */
- // private boolean _messageQueueCapacityReachedWarning = false;
/**
* Constructor.
@@ -2307,7 +2338,7 @@ public class CacheClientProxy implements ClientSession {
HARegionQueueAttributes harq = new HARegionQueueAttributes();
harq.setBlockingQueueCapacity(proxy._maximumMessageCount);
harq.setExpiryTime(proxy._messageTimeToLive);
- ((HAContainerWrapper) proxy._cacheClientNotifier.getHaContainer())
+ ((HAContainerWrapper) proxy.cacheClientNotifier.getHaContainer())
.putProxy(HARegionQueue.createRegionName(getProxy().getHARegionName()), getProxy());
boolean createDurableQueue = proxy.proxyID.isDurable();
boolean canHandleDelta = (proxy.clientVersion.compareTo(Version.GFE_61) >= 0)
@@ -2318,7 +2349,7 @@ public class CacheClientProxy implements ClientSession {
}
this._messageQueue = HARegionQueue.getHARegionQueueInstance(getProxy().getHARegionName(),
getCache(), harq, HARegionQueue.BLOCKING_HA_QUEUE, createDurableQueue,
- proxy._cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
+ proxy.cacheClientNotifier.getHaContainer(), proxy.getProxyID(),
this._proxy.clientConflation, this._proxy.isPrimary(), canHandleDelta);
// Check if interests were registered during HARegion GII.
if (this._proxy.hasRegisteredInterested()) {
@@ -2409,10 +2440,6 @@ public class CacheClientProxy implements ClientSession {
Thread.sleep(500);
} catch (InterruptedException e) {
interrupted = true;
- /*
- * GemFireCache c = (GemFireCache)_cache;
- * c.getDistributedSystem().getCancelCriterion().checkCancelInProgress(e);
- */
} catch (CancelException e) {
break;
} catch (CacheException e) {
@@ -2507,7 +2534,7 @@ public class CacheClientProxy implements ClientSession {
ClientMessage clientMessage = null;
while (!isStopped()) {
// SystemFailure.checkFailure(); DM's stopper does this
- if (this._proxy._cache.getCancelCriterion().isCancelInProgress()) {
+ if (this._proxy.cache.getCancelCriterion().isCancelInProgress()) {
break;
}
try {
@@ -2756,9 +2783,6 @@ public class CacheClientProxy implements ClientSession {
}
Message message = null;
- // byte[] latestValue =
- // this._eventConflator.getLatestValue(clientMessage);
-
if (clientMessage instanceof ClientUpdateMessage) {
byte[] latestValue = (byte[]) ((ClientUpdateMessage) clientMessage).getValue();
if (logger.isTraceEnabled()) {
@@ -2775,7 +2799,7 @@ public class CacheClientProxy implements ClientSession {
message = ((ClientUpdateMessageImpl) clientMessage).getMessage(getProxy(), latestValue);
- if (AFTER_MESSAGE_CREATION_FLAG) {
+ if (afterMessageCreationForTesting) {
ClientServerObserver bo = ClientServerObserverHolder.getInstance();
bo.afterMessageCreation(message);
}
@@ -2783,37 +2807,9 @@ public class CacheClientProxy implements ClientSession {
message = clientMessage.getMessage(getProxy(), true /* notify */);
}
- // //////////////////////////////
- // TEST CODE BEGIN (Throws exception to test closing proxy)
- // if (true) throw new IOException("test");
- // TEST CODE END
- // //////////////////////////////
- // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID,
- // latestValue);
- // Message message = clientMessage.getMessage(); removed during merge.
- // BugFix for BUG#38206 and BUG#37791
if (!this._proxy.isPaused()) {
sendMessage(message);
- // //////////////////////////////
- // TEST CODE BEGIN (Throws exception to test closing proxy)
- // if (true) throw new IOException("test");
- // TEST CODE END
- // //////////////////////////////
- // Message message = ((ClientUpdateMessageImpl)clientMessage).getMessage(getProxy().proxyID,
- // latestValue);
- // Message message = clientMessage.getMessage(); removed during merge.
- // message.setComms(getSocket(), getCommBuffer(), getStatistics());
- // message.send();
-
- // //////////////////////////////
- // TEST CODE BEGIN (Introduces random wait in client)
- // Sleep a random number of ms
- // java.util.Random rand = new java.util.Random();
- // try {Thread.sleep(rand.nextInt(5));} catch (InterruptedException e) {}
- // TEST CODE END
- // //////////////////////////////
-
if (logger.isTraceEnabled()) {
logger.trace("{}: Dispatched {}", this, clientMessage);
}
@@ -2855,7 +2851,7 @@ public class CacheClientProxy implements ClientSession {
try {
this._messageQueue.put(clientMessage);
if (this._proxy.isPaused() && this._proxy.isDurable()) {
- this._proxy._cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
+ this._proxy.cacheClientNotifier._statistics.incEventEnqueuedWhileClientAwayCount();
if (logger.isDebugEnabled()) {
logger.debug("{}: Queued message while Durable Client is away {}", this, clientMessage);
}
@@ -2959,7 +2955,7 @@ public class CacheClientProxy implements ClientSession {
this._pausedLock.notifyAll();
}
- protected Object deserialize(byte[] serializedBytes) {
+ private Object deserialize(byte[] serializedBytes) {
Object deserializedObject = serializedBytes;
// This is a debugging method so ignore all exceptions like
// ClassNotFoundException
@@ -2983,89 +2979,7 @@ public class CacheClientProxy implements ClientSession {
}
}
- /**
- * Returns the current number of CQS the client installed.
- *
- * @return int the current count of CQs for this client
- */
- public int getCqCount() {
- synchronized (this) {
- return this._statistics.getCqCount();
- }
- }
-
- /**
- * Increment the number of CQs the client installed
- *
- */
- public void incCqCount() {
- synchronized (this) {
- this._statistics.incCqCount();
- }
- }
-
- /**
- * Decrement the number of CQs the client installed
- *
- */
- public synchronized void decCqCount() {
- synchronized (this) {
- this._statistics.decCqCount();
- }
- }
-
- /**
- * Returns true if the client has one CQ
- *
- * @return true if the client has one CQ
- */
- public boolean hasOneCq() {
- synchronized (this) {
- return this._statistics.getCqCount() == 1;
- }
- }
-
- /**
- * Returns true if the client has no CQs
- *
- * @return true if the client has no CQs
- */
- public boolean hasNoCq() {
- synchronized (this) {
- return this._statistics.getCqCount() == 0;
- }
- }
-
- /**
- * Get map of regions with empty data policy
- *
- * @since GemFire 6.1
- */
- public Map getRegionsWithEmptyDataPolicy() {
- return regionsWithEmptyDataPolicy;
- }
-
- public int incrementAndGetPingCounter() {
- int pingCount = this.pingCounter.incrementAndGet();
- return pingCount;
- }
-
- public void resetPingCounter() {
- this.pingCounter.set(0);
- }
-
- /**
- * Returns the number of seconds that have elapsed since the Client proxy created.
- *
- * @since GemFire 7.0
- */
- public long getUpTime() {
- return (long) ((System.currentTimeMillis() - this.creationDate.getTime()) / 1000);
- }
-
public interface TestHook {
- public void doTestHook(String spot);
+ void doTestHook(String spot);
}
-
- public static TestHook testHook;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
index e21a834..6e8f9ce 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientHealthMonitor.java
@@ -22,7 +22,11 @@ import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.Version;
-import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.CacheClientStatus;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.IncomingGatewayStatus;
+import org.apache.geode.internal.cache.TXId;
+import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.tier.Acceptor;
import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -32,7 +36,14 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.logging.log4j.Logger;
import java.net.InetAddress;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerArray;
/**
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
index 2cbf63b..46e43c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientProxyMembershipID.java
@@ -34,11 +34,8 @@ import static org.apache.geode.distributed.ConfigurationProperties.*;
/**
* This class represents a ConnectionProxy of the CacheClient
- *
- *
- *
*/
-public final class ClientProxyMembershipID
+public class ClientProxyMembershipID
implements DataSerializableFixedID, Serializable, Externalizable {
private static final Logger logger = LogService.getLogger();
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index 6bbe7b8..7d1603d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -352,8 +352,8 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
byte[] serializedValue = null;
Message message = null;
boolean conflation = false;
- conflation = (proxy.clientConflation == HandShake.CONFLATION_ON)
- || (proxy.clientConflation == HandShake.CONFLATION_DEFAULT && this.shouldBeConflated());
+ conflation = (proxy.isClientConflationOn())
+ || (proxy.isClientConflationDefault() && this.shouldBeConflated());
if (latestValue != null) {
serializedValue = latestValue;
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
index 6e119c0..8a51c31 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HandShake.java
@@ -62,6 +62,7 @@ import org.apache.geode.CancelCriterion;
import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.InternalGemFireException;
+import org.apache.geode.LogWriter;
import org.apache.geode.cache.GatewayConfigurationException;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.ServerRefusedConnectionException;
@@ -1669,8 +1670,8 @@ public class HandShake implements ClientHandShake {
* not
*/
public static Object verifyCredentials(String authenticatorMethod, Properties credentials,
- Properties securityProperties, InternalLogWriter logWriter,
- InternalLogWriter securityLogWriter, DistributedMember member)
+ Properties securityProperties, LogWriter logWriter, LogWriter securityLogWriter,
+ DistributedMember member)
throws AuthenticationRequiredException, AuthenticationFailedException {
if (!AcceptorImpl.isAuthenticationRequired()) {
@@ -1702,8 +1703,8 @@ public class HandShake implements ClientHandShake {
String methodName = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
return verifyCredentials(methodName, this.credentials, this.system.getSecurityProperties(),
- (InternalLogWriter) this.system.getLogWriter(),
- (InternalLogWriter) this.system.getSecurityLogWriter(), this.id.getDistributedMember());
+ this.system.getLogWriter(), this.system.getSecurityLogWriter(),
+ this.id.getDistributedMember());
}
public void sendCredentialsForWan(OutputStream out, InputStream in) {
@@ -1731,8 +1732,7 @@ public class HandShake implements ClientHandShake {
String authenticator = this.system.getProperties().getProperty(SECURITY_CLIENT_AUTHENTICATOR);
Properties peerWanProps = readCredentials(dis, dos, this.system);
verifyCredentials(authenticator, peerWanProps, this.system.getSecurityProperties(),
- (InternalLogWriter) this.system.getLogWriter(),
- (InternalLogWriter) this.system.getSecurityLogWriter(), member);
+ this.system.getLogWriter(), this.system.getSecurityLogWriter(), member);
}
private static int getKeySize(String skAlgo) {
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
index 5a229d3..dfce317 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/logging/LogService.java
@@ -151,6 +151,16 @@ public class LogService extends LogManager {
}
/**
+ * Returns a Logger with the name of @{link SECURITY_LOGGER_NAME}.
+ *
+ * @return The security Logger.
+ */
+ public static Logger getSecurityLogger() {
+ return new FastLogger(
+ LogManager.getLogger(SECURITY_LOGGER_NAME, GemFireParameterizedMessageFactory.INSTANCE));
+ }
+
+ /**
* Returns a LogWriterLogger that is decorated with the LogWriter and LogWriterI18n methods.
* <p>
* This is the bridge to LogWriter and LogWriterI18n that we need to eventually stop using in
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
new file mode 100644
index 0000000..017e0f5
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/ServerLauncherUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed;
+
+import org.apache.geode.cache.Cache;
+
+/**
+ * Provides tests a way to access non-public state in ServerLauncher
+ */
+public class ServerLauncherUtils {
+
+ /**
+ * Returns the Cache from an online in-process ServerLauncher instance
+ */
+ public static Cache getCache(final ServerLauncher serverLauncher) {
+ return serverLauncher.getCache();
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
index 7aa11b7..8ff541b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
@@ -14,21 +14,14 @@
*/
package org.apache.geode.internal.cache.tier.sockets;
-import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.client.ServerRefusedConnectionException;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.AvailablePort;
import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.tier.Acceptor;
-import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.apache.geode.test.junit.categories.IntegrationTest;
import org.junit.After;
@@ -39,7 +32,6 @@ import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.net.BindException;
-import java.net.Socket;
import java.util.Collections;
import java.util.Properties;
@@ -85,7 +77,8 @@ public class AcceptorImplJUnitTest {
CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, CacheServer.DEFAULT_MAX_THREADS,
CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
- null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+ null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+ this.cache.getCancelCriterion());
fail("Expected an IllegalArgumentExcption due to max conns < min pool size");
} catch (IllegalArgumentException expected) {
}
@@ -95,7 +88,7 @@ public class AcceptorImplJUnitTest {
CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0,
CacheServer.DEFAULT_MAX_THREADS, CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, Collections.EMPTY_LIST,
- CacheServer.DEFAULT_TCP_NO_DELAY);
+ CacheServer.DEFAULT_TCP_NO_DELAY, this.cache.getCancelCriterion());
fail("Expected an IllegalArgumentExcption due to max conns of zero");
} catch (IllegalArgumentException expected) {
}
@@ -105,12 +98,14 @@ public class AcceptorImplJUnitTest {
CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
- null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+ null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+ this.cache.getCancelCriterion());
a2 = new AcceptorImpl(port1, null, false, CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
- null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+ null, null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+ this.cache.getCancelCriterion());
fail("Expecetd a BindException while attaching to the same port");
} catch (BindException expected) {
}
@@ -119,7 +114,8 @@ public class AcceptorImplJUnitTest {
CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
AcceptorImpl.MINIMUM_MAX_CONNECTIONS, CacheServer.DEFAULT_MAX_THREADS,
CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
- null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY);
+ null, false, Collections.EMPTY_LIST, CacheServer.DEFAULT_TCP_NO_DELAY,
+ this.cache.getCancelCriterion());
assertEquals(port2, a3.getPort());
InternalDistributedSystem isystem =
(InternalDistributedSystem) this.cache.getDistributedSystem();
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
new file mode 100644
index 0000000..7ab539d
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.wan.GatewayTransportFilter;
+import org.apache.geode.distributed.internal.DistributionConfigImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.net.SocketCreatorFactory;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+@Category(UnitTest.class)
+public class AcceptorImplTest {
+
+ @Before
+ public void before() throws Exception {
+ DistributionConfigImpl distributionConfig = new DistributionConfigImpl(new Properties());
+ SocketCreatorFactory.setDistributionConfig(distributionConfig);
+ }
+
+ @After
+ public void after() throws Exception {
+ SocketCreatorFactory.close();
+ }
+
+ @Test
+ public void constructWithDefaults() throws Exception {
+ /*
+ * Problems:
+ *
+ * this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount,
+ * messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver);
+ *
+ * this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings,
+ * this.clientNotifier.getStats());
+ *
+ * LoggingThreadGroup / ThreadFactory / ThreadPoolExecutor
+ *
+ * isAuthenticationRequired = this.securityService.isClientSecurityRequired();
+ *
+ * isIntegratedSecurity = this.securityService.isIntegratedSecurity();
+ *
+ *
+ * String postAuthzFactoryName =
+ * this.cache.getDistributedSystem().getProperties().getProperty(SECURITY_CLIENT_ACCESSOR_PP);
+ *
+ */
+
+ int port = 0;
+ String bindHostName = SocketCreator.getLocalHost().getHostName();
+ boolean notifyBySubscription = false;
+ int socketBufferSize = 1;
+ int maximumTimeBetweenPings = 0;
+ InternalCache internalCache = null;
+ int maxConnections = 0;
+ int maxThreads = 0;
+ int maximumMessageCount = 0;
+ int messageTimeToLive = 0;
+ ConnectionListener listener = null;
+ List overflowAttributesList = null;
+ boolean isGatewayReceiver = false;
+ List<GatewayTransportFilter> transportFilter = Collections.emptyList();
+ boolean tcpNoDelay = false;
+ CancelCriterion cancelCriterion = null;
+
+ AcceptorImpl acceptor = new AcceptorImpl(port, bindHostName, notifyBySubscription,
+ socketBufferSize, maximumTimeBetweenPings, internalCache, maxConnections, maxThreads,
+ maximumMessageCount, messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver,
+ transportFilter, tcpNoDelay, cancelCriterion);
+
+ assertThat(acceptor).isNotNull();
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
new file mode 100644
index 0000000..8cd7622
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheServerUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.internal.cache.CacheServerImpl;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Provides tests a way to access CacheServer, AcceptorImpl and ServerConnection
+ */
+public class CacheServerUtils {
+
+ /**
+ * Returns single CacheServer for the specified Cache instance
+ */
+ public static CacheServer getCacheServer(final Cache cache) {
+ List<CacheServer> cacheServers = cache.getCacheServers();
+ CacheServer cacheServer = cacheServers.get(0);
+ return cacheServer;
+ }
+
+ /**
+ * Returns AcceptorImpl for the specified CacheServer instance
+ */
+ public static AcceptorImpl getAcceptorImpl(final CacheServer cacheServer) {
+ AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+ return acceptor;
+ }
+
+ /**
+ * Returns single ServerConnection for the specified CacheServer instance
+ */
+ public static ServerConnection getServerConnection(final CacheServer cacheServer) {
+ AcceptorImpl acceptor = ((CacheServerImpl) cacheServer).getAcceptor();
+ Set<ServerConnection> serverConnections = acceptor.getAllServerConnections();
+ ServerConnection serverConnection = serverConnections.iterator().next(); // null
+ return serverConnection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
index 31f67aa..f4a8cc8 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientConflationDUnitTest.java
@@ -99,7 +99,7 @@ public class ClientConflationDUnitTest extends JUnit4DistributedTestCase {
*
*/
public static void setIsSlowStart() {
- CacheClientProxy.isSlowStartForTesting = true;
+ CacheClientProxy.setSlowStartForTesting();
System.setProperty("slowStartTimeForTesting", "15000");
}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
index 1a76daa..efc0367 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerForceInvalidateDUnitTest.java
@@ -166,7 +166,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
}
private static void installObserver() {
- CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = true;
+ CacheClientProxy.setAfterMessageCreationForTesting();
ClientServerObserverHolder.setInstance(new DelaySendingEvent());
}
@@ -176,7 +176,7 @@ public class ClientServerForceInvalidateDUnitTest extends JUnit4CacheTestCase {
}
private static void cleanupObserver() {
- CacheClientProxy.AFTER_MESSAGE_CREATION_FLAG = false;
+ CacheClientProxy.unsetAfterMessageCreationForTesting();
ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter());
}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
index b4f3185..43330a5 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientServerMiscDUnitTest.java
@@ -62,13 +62,9 @@ import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.standalone.VersionManager;
import org.apache.geode.test.junit.categories.ClientServerTest;
import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import java.util.Collection;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
@@ -1014,7 +1010,8 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
while (iter_prox.hasNext()) {
CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
// CCP should not contain region1
- Set akr = ccp.cils[RegisterInterestTracker.interestListIndex].regions;
+ Set akr = ccp
+ .getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex].regions;
assertNotNull(akr);
assertTrue(!akr.contains(Region.SEPARATOR + REGION_NAME1));
// CCP should contain region2
@@ -1352,7 +1349,7 @@ public class ClientServerMiscDUnitTest extends JUnit4CacheTestCase {
*
*/
public static void unsetSlowDispatcherFlag() {
- CacheClientProxy.isSlowStartForTesting = false;
+ CacheClientProxy.unsetSlowStartForTesting();
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
index b1e16ee..275e458 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ConflationDUnitTest.java
@@ -112,7 +112,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase {
*
*/
public static void setIsSlowStart(String milis) {
- CacheClientProxy.isSlowStartForTesting = true;
+ CacheClientProxy.setSlowStartForTesting();
System.setProperty("slowStartTimeForTesting", milis);
}
@@ -121,7 +121,7 @@ public class ConflationDUnitTest extends JUnit4DistributedTestCase {
*
*/
public static void unsetIsSlowStart() {
- CacheClientProxy.isSlowStartForTesting = false;
+ CacheClientProxy.unsetSlowStartForTesting();
}
/**
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
index 544f732..9d60cc7 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAInterestTestCase.java
@@ -459,7 +459,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
iter_prox = ccn.getClientProxies().iterator();
if (iter_prox.hasNext()) {
proxy = (CacheClientProxy) iter_prox.next();
- return proxy._messageDispatcher.isAlive();
+ return proxy.getMessageDispatcherForTesting().isAlive();
} else {
return false;
}
@@ -510,7 +510,7 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
if (iter_prox.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
assertFalse("Dispatcher on secondary should not be alive",
- proxy._messageDispatcher.isAlive());
+ proxy.getMessageDispatcherForTesting().isAlive());
}
}
@@ -818,8 +818,10 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
wc = new WaitCriterion() {
@Override
public boolean done() {
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+ Set keysMap =
+ (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME)
+ .getKeysOfInterestFor(ccp.getProxyID());
return keysMap != null && keysMap.size() == 2;
}
@@ -830,8 +832,9 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
};
Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+ Set keysMap =
+ (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
assertNotNull(keysMap);
assertEquals(2, keysMap.size());
assertTrue(keysMap.contains(k1));
@@ -879,8 +882,10 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
wc = new WaitCriterion() {
@Override
public boolean done() {
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+ Set keysMap =
+ (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME)
+ .getKeysOfInterestFor(ccp.getProxyID());
return keysMap != null;
}
@@ -891,8 +896,9 @@ public class HAInterestTestCase extends JUnit4DistributedTestCase {
};
Wait.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, true);
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+ Set keysMap =
+ (Set) ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
assertNotNull(keysMap);
assertEquals(1, keysMap.size());
assertFalse(keysMap.contains(k1));
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
index 6aea509..3585c3e 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HAStartupAndFailoverDUnitTest.java
@@ -471,7 +471,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase {
String excuse;
public boolean done() {
- return proxy._messageDispatcher.isAlive();
+ return proxy.getMessageDispatcherForTesting().isAlive();
}
public String description() {
@@ -529,7 +529,7 @@ public class HAStartupAndFailoverDUnitTest extends JUnit4DistributedTestCase {
if (iter_prox.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
assertFalse("Dispatcher on secondary should not be alive",
- proxy._messageDispatcher.isAlive());
+ proxy.getMessageDispatcherForTesting().isAlive());
}
} catch (Exception ex) {
fail("while setting verifyDispatcherIsNotAlive " + ex);
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
index 041cd38..be9265b 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/InterestListRecoveryDUnitTest.java
@@ -434,8 +434,8 @@ public class InterestListRecoveryDUnitTest extends JUnit4DistributedTestCase {
public static Set getKeysOfInterestMap(CacheClientProxy proxy, String regionName) {
// assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]);
// assertNotNull(proxy.cils[RegisterInterestTracker.interestListIndex]._keysOfInterest);
- return proxy.cils[RegisterInterestTracker.interestListIndex].getProfile(regionName)
- .getKeysOfInterestFor(proxy.getProxyID());
+ return proxy.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+ .getProfile(regionName).getKeysOfInterestFor(proxy.getProxyID());
}
@Override
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
index 4a98298..1635fca 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/RedundancyLevelTestBase.java
@@ -189,10 +189,10 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
String excuse;
public boolean done() {
- if (proxy._messageDispatcher == null) {
+ if (proxy.getMessageDispatcherForTesting() == null) {
return false;
}
- return proxy._messageDispatcher.isAlive();
+ return proxy.getMessageDispatcherForTesting().isAlive();
}
public String description() {
@@ -245,7 +245,7 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
if (iter_prox.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
assertFalse("Dispatcher on secondary should not be alive",
- proxy._messageDispatcher.isAlive());
+ proxy.getMessageDispatcherForTesting().isAlive());
}
} catch (Exception ex) {
@@ -427,8 +427,10 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
String excuse;
public boolean done() {
- Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+ Set keysMap = (Set) ccp
+ .getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME)
+ .getKeysOfInterestFor(ccp.getProxyID());
if (keysMap == null) {
excuse = "keys of interest is null";
return false;
@@ -446,8 +448,9 @@ public class RedundancyLevelTestBase extends JUnit4DistributedTestCase {
};
Wait.waitForCriterion(wc, 180 * 1000, 2 * 1000, true);
- Set keysMap = ccp.cils[RegisterInterestTracker.interestListIndex]
- .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
+ Set keysMap =
+ ccp.getClientInterestListForTesting()[RegisterInterestTracker.interestListIndex]
+ .getProfile(Region.SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
assertTrue(keysMap.contains(k1));
assertTrue(keysMap.contains(k2));
http://git-wip-us.apache.org/repos/asf/geode/blob/407afd93/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
new file mode 100644
index 0000000..2d900dc
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/command/ExperimentIntegrationTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import static org.apache.geode.distributed.AbstractLauncher.Status.ONLINE;
+import static org.apache.geode.distributed.ServerLauncherUtils.*;
+import static org.apache.geode.internal.cache.tier.sockets.AcceptorImpl.DEFAULT_HANDSHAKE_TIMEOUT_MS;
+import static org.apache.geode.internal.cache.tier.sockets.CacheServerUtils.*;
+import static org.apache.geode.internal.AvailablePort.*;
+import static org.assertj.core.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ServerLauncher;
+import org.apache.geode.internal.cache.tier.Acceptor;
+import org.apache.geode.internal.cache.tier.Command;
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+import org.apache.geode.internal.cache.tier.sockets.Message;
+import org.apache.geode.internal.cache.tier.sockets.ServerConnection;
+import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.net.Socket;
+
+@Category(IntegrationTest.class)
+public class ExperimentIntegrationTest {
+
+ private ServerLauncher serverLauncher;
+ private ServerConnection serverConnection;
+
+ @Before
+ public void before() throws Exception {
+ int serverPort = getRandomAvailablePort(SOCKET);
+
+ this.serverLauncher =
+ new ServerLauncher.Builder().setMemberName("server").setServerPort(serverPort).build();
+ this.serverLauncher.start();
+
+ Cache cache = getCache(this.serverLauncher);
+ CacheServer cacheServer = getCacheServer(cache);
+ AcceptorImpl acceptor = getAcceptorImpl(cacheServer);
+
+ Socket mockSocket = mock(Socket.class);
+ when(mockSocket.getInetAddress()).thenReturn(SocketCreator.getLocalHost());
+
+ this.serverConnection =
+ new ServerConnection(mockSocket, cache, null, null, DEFAULT_HANDSHAKE_TIMEOUT_MS,
+ CacheServer.DEFAULT_SOCKET_BUFFER_SIZE, "client", Acceptor.CLIENT_TO_SERVER, acceptor);
+
+ preConditions();
+ }
+
+ public void preConditions() throws Exception {
+ assertThat(this.serverLauncher.status().getStatus()).isEqualTo(ONLINE);
+ }
+
+ @Test
+ public void handlePutFromFakeClient() throws Exception {
+ Message message = mock(Message.class);
+ Command command = mock(Command.class);
+ command.execute(message, this.serverConnection);
+ }
+
+}