You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:13 UTC
[08/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PoolImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PoolImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PoolImpl.java
new file mode 100644
index 0000000..1c22037
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PoolImpl.java
@@ -0,0 +1,1513 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.StatisticsFactory;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.NoSubscriptionServersAvailableException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionService;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManager;
+import com.gemstone.gemfire.cache.client.internal.pooling.ConnectionManagerImpl;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.PoolCancelledException;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.DummyStatisticsFactory;
+import com.gemstone.gemfire.internal.ScheduledThreadPoolExecutorWithKeepAlive;
+import com.gemstone.gemfire.internal.admin.ClientStatsManager;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.InternalCache;
+import com.gemstone.gemfire.internal.cache.PoolFactoryImpl;
+import com.gemstone.gemfire.internal.cache.PoolManagerImpl;
+import com.gemstone.gemfire.internal.cache.PoolStats;
+import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+
+/**
+ * Manages the client side of client to server connections
+ * and client queues.
+ *
+ * @author dsmith
+ * @since 5.7
+ */
+public class PoolImpl implements InternalPool {
+ private static final Logger logger = LogService.getLogger();
+
+ public static final int HANDSHAKE_TIMEOUT = Long.getLong("gemfire.PoolImpl.HANDSHAKE_TIMEOUT", AcceptorImpl.DEFAULT_HANDSHAKE_TIMEOUT_MS).intValue();
+ public static final long SHUTDOWN_TIMEOUT = Long.getLong("gemfire.PoolImpl.SHUTDOWN_TIMEOUT", 30000).longValue();
+ public static final int BACKGROUND_TASK_POOL_SIZE = Integer.getInteger("gemfire.PoolImpl.BACKGROUND_TASK_POOL_SIZE", 20).intValue();
+ public static final int BACKGROUND_TASK_POOL_KEEP_ALIVE = Integer.getInteger("gemfire.PoolImpl.BACKGROUND_TASK_POOL_KEEP_ALIVE", 1000).intValue();
+ //For durable client tests only. Connection Sources read this flag
+ //and return an empty list of servers.
+ public volatile static boolean TEST_DURABLE_IS_NET_DOWN = false;
+
+ private final String name;
+ private final int freeConnectionTimeout;
+ private final int loadConditioningInterval;
+ private final int socketBufferSize;
+ private final boolean threadLocalConnections;
+ private final int readTimeout;
+ private final boolean subscriptionEnabled;
+ private final boolean prSingleHopEnabled;
+ private final int subscriptionRedundancyLevel;
+ private final int subscriptionMessageTrackingTimeout;
+ private final int subscriptionAckInterval;
+ private final String serverGroup;
+ private final List<InetSocketAddress> locators;
+ private final List<InetSocketAddress> servers;
+ private final boolean startDisabled;
+ private final boolean usedByGateway;
+ private final int maxConnections;
+ private final int minConnections;
+ private final int retryAttempts;
+ private final long idleTimeout;
+ private final long pingInterval;
+ private final int statisticInterval;
+ private final boolean multiuserSecureModeEnabled;
+
+ private final ConnectionSource source;
+ private final ConnectionManager manager;
+ private QueueManager queueManager;
+ protected final EndpointManager endpointManager;
+ private final PoolManagerImpl pm;
+ protected final InternalLogWriter securityLogWriter;
+ protected volatile boolean destroyed;
+ private final PoolStats stats;
+ private ScheduledExecutorService backgroundProcessor;
+ private final OpExecutorImpl executor;
+ private final RegisterInterestTracker riTracker = new RegisterInterestTracker();
+ private final InternalDistributedSystem dsys;
+
+ private final ClientProxyMembershipID proxyId;
+ protected final CancelCriterion cancelCriterion;
+ private final ConnectionFactoryImpl connectionFactory;
+
+ private final ArrayList<ProxyCache> proxyCacheList;
+
+ private final GatewaySender gatewaySender;
+
+ private boolean keepAlive=false;
+ private static Object simpleLock=new Object();
+
+ public static final int PRIMARY_QUEUE_NOT_AVAILABLE = -2;
+ public static final int PRIMARY_QUEUE_TIMED_OUT = -1;
+ private AtomicInteger primaryQueueSize = new AtomicInteger(PRIMARY_QUEUE_NOT_AVAILABLE);
+
+ public static PoolImpl create(PoolManagerImpl pm, String name, Pool attributes) {
+ PoolImpl pool = new PoolImpl(pm, name, attributes);
+ pool.finishCreate(pm);
+ return pool;
+ }
+
+ public boolean isUsedByGateway() {
+ return usedByGateway;
+ }
+
+ /**
+ * @since 5.7
+ */
+ protected void finishCreate(PoolManagerImpl pm) {
+ pm.register(this);
+ try {
+ start();
+ } catch(RuntimeException e) {
+ try {
+ destroy(false);
+ } catch(RuntimeException e2) {
+ //do nothing
+ }
+ throw e;
+ }
+ }
+
+ protected PoolImpl(PoolManagerImpl pm, String name, Pool attributes) {
+ this.pm = pm;
+ this.name = name;
+ this.freeConnectionTimeout = attributes.getFreeConnectionTimeout();
+ this.loadConditioningInterval = attributes.getLoadConditioningInterval();
+ this.socketBufferSize = attributes.getSocketBufferSize();
+ this.threadLocalConnections = attributes.getThreadLocalConnections();
+ this.readTimeout = attributes.getReadTimeout();
+ this.minConnections = attributes.getMinConnections();
+ this.maxConnections = attributes.getMaxConnections();
+ this.retryAttempts = attributes.getRetryAttempts();
+ this.idleTimeout = attributes.getIdleTimeout();
+ this.pingInterval = attributes.getPingInterval();
+ this.statisticInterval = attributes.getStatisticInterval();
+ this.subscriptionEnabled = attributes.getSubscriptionEnabled();
+ this.prSingleHopEnabled = attributes.getPRSingleHopEnabled();
+ this.subscriptionRedundancyLevel = attributes.getSubscriptionRedundancy();
+ this.subscriptionMessageTrackingTimeout = attributes.getSubscriptionMessageTrackingTimeout();
+ this.subscriptionAckInterval = attributes.getSubscriptionAckInterval();
+ this.serverGroup = attributes.getServerGroup();
+ this.multiuserSecureModeEnabled = attributes.getMultiuserAuthentication();
+ this.locators = attributes.getLocators();
+ this.servers = attributes.getServers();
+ this.startDisabled = ((PoolFactoryImpl.PoolAttributes)attributes).startDisabled
+ || !pm.isNormal();
+ this.usedByGateway = ((PoolFactoryImpl.PoolAttributes)attributes).isGateway();
+ this.gatewaySender = ((PoolFactoryImpl.PoolAttributes)attributes).getGatewaySender();
+// if (this.subscriptionEnabled && this.multiuserSecureModeEnabled) {
+// throw new IllegalStateException(
+// "subscription-enabled and multiuser-authentication both cannot be true.");
+// }
+ InternalDistributedSystem ds = InternalDistributedSystem.getAnyInstance();
+ if(ds==null) {
+ throw new IllegalStateException(LocalizedStrings.PoolImpl_DISTRIBUTED_SYSTEM_MUST_BE_CREATED_BEFORE_CREATING_POOL.toLocalizedString());
+ }
+ this.securityLogWriter = ds.getSecurityInternalLogWriter();
+ if (!ds.getConfig().getStatisticSamplingEnabled()
+ && this.statisticInterval > 0) {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.PoolImpl_STATISTIC_SAMPLING_MUST_BE_ENABLED_FOR_SAMPLING_RATE_OF_0_TO_TAKE_AFFECT,
+ this.statisticInterval));
+ }
+ this.dsys = ds;
+ this.cancelCriterion = new Stopper();
+ if(Boolean.getBoolean("gemfire.SPECIAL_DURABLE")) {
+ ClientProxyMembershipID.setPoolName(name);
+ this.proxyId = ClientProxyMembershipID.getNewProxyMembership(ds);
+ ClientProxyMembershipID.setPoolName(null);
+ } else {
+ this.proxyId = ClientProxyMembershipID.getNewProxyMembership(ds);
+ }
+ StatisticsFactory statFactory = null;
+ if(this.gatewaySender != null){
+ statFactory = new DummyStatisticsFactory();
+ }else{
+ statFactory = ds;
+ }
+ this.stats = this.startDisabled
+ ? null
+ : new PoolStats(statFactory, getName()+"->"+(serverGroup==null || serverGroup.equals("") ? "[any servers]" : "["+getServerGroup()+"]"));
+
+ source = getSourceImpl(((PoolFactoryImpl.PoolAttributes)attributes).locatorCallback);
+ endpointManager = new EndpointManagerImpl(name, ds,this.cancelCriterion, this.stats);
+ connectionFactory = new ConnectionFactoryImpl(source, endpointManager, ds,
+ socketBufferSize, HANDSHAKE_TIMEOUT, readTimeout, proxyId, this.cancelCriterion,
+ usedByGateway,gatewaySender, pingInterval, multiuserSecureModeEnabled, this);
+ if(subscriptionEnabled) {
+ queueManager = new QueueManagerImpl(this, endpointManager, source,
+ connectionFactory, subscriptionRedundancyLevel, pingInterval, securityLogWriter,
+ proxyId);
+ }
+
+ manager = new ConnectionManagerImpl(name, connectionFactory, endpointManager,
+ maxConnections, minConnections,
+ idleTimeout, loadConditioningInterval,
+ securityLogWriter, pingInterval,
+ cancelCriterion, getStats());
+ //Fix for 43468 - make sure we check the cache cancel criterion if we get
+ //an exception, by passing in the poolOrCache stopper
+ executor = new OpExecutorImpl(manager, queueManager, endpointManager,
+ riTracker, retryAttempts, freeConnectionTimeout, threadLocalConnections,
+ new PoolOrCacheStopper(), this);
+ if (this.multiuserSecureModeEnabled) {
+ this.proxyCacheList = new ArrayList<ProxyCache>();
+ } else {
+ this.proxyCacheList = null;
+ }
+ }
+
+ /**
+ * Return true if the given Pool is compatible with these attributes.
+ * Currently this does what equals would but in the future we might
+ * decide to weaken the compatibility contract.
+ * @since 6.5
+ */
+ public boolean isCompatible(Pool p) {
+ if (p == null) return false;
+ return getFreeConnectionTimeout() == p.getFreeConnectionTimeout()
+ && getLoadConditioningInterval() == p.getLoadConditioningInterval()
+ && getSocketBufferSize() == p.getSocketBufferSize()
+ && getMinConnections() == p.getMinConnections()
+ && getMaxConnections() == p.getMaxConnections()
+ && getIdleTimeout() == p.getIdleTimeout()
+ && getPingInterval() == p.getPingInterval()
+ && getStatisticInterval() == p.getStatisticInterval()
+ && getRetryAttempts() == p.getRetryAttempts()
+ && getThreadLocalConnections() == p.getThreadLocalConnections()
+ && getReadTimeout() == p.getReadTimeout()
+ && getSubscriptionEnabled() == p.getSubscriptionEnabled()
+ && getPRSingleHopEnabled() == p.getPRSingleHopEnabled()
+ && getSubscriptionRedundancy() == p.getSubscriptionRedundancy()
+ && getSubscriptionMessageTrackingTimeout() == p.getSubscriptionMessageTrackingTimeout()
+ && getSubscriptionAckInterval() == p.getSubscriptionAckInterval()
+ && getServerGroup().equals( p.getServerGroup())
+ && getMultiuserAuthentication() == p.getMultiuserAuthentication()
+ && getLocators().equals( p.getLocators())
+ && getServers().equals( p.getServers());
+ }
+
+ private void start() {
+ if (this.startDisabled) return;
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if(isDebugEnabled) {
+ List locators = getLocators();
+ if(!locators.isEmpty()) {
+ logger.debug("PoolImpl - starting pool with locators: {}", locators);
+ } else {
+ logger.debug("PoolImpl -starting pool with servers: {}", getServers());
+ }
+ }
+
+ final String timerName = "poolTimer-" + getName() + "-";
+ backgroundProcessor = new ScheduledThreadPoolExecutorWithKeepAlive(
+ BACKGROUND_TASK_POOL_SIZE, BACKGROUND_TASK_POOL_KEEP_ALIVE,
+ TimeUnit.MILLISECONDS, new ThreadFactory() {
+ AtomicInteger threadNum = new AtomicInteger();
+ public Thread newThread(final Runnable r) {
+ Thread result = new Thread(r, timerName + threadNum.incrementAndGet());
+ result.setDaemon(true);
+ return result;
+ }
+ });
+ ((ScheduledThreadPoolExecutorWithKeepAlive) backgroundProcessor)
+ .setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ ((ScheduledThreadPoolExecutorWithKeepAlive) backgroundProcessor)
+ .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+
+ source.start(this);
+ connectionFactory.start(backgroundProcessor);
+ endpointManager.addListener(new InstantiatorRecoveryListener(backgroundProcessor, this));
+ endpointManager.addListener(new DataSerializerRecoveryListener(backgroundProcessor, this));
+ if(Boolean.getBoolean("gemfire.ON_DISCONNECT_CLEAR_PDXTYPEIDS"))
+ endpointManager.addListener(new PdxRegistryRecoveryListener(this));
+ endpointManager.addListener(new LiveServerPinger(this));
+
+ manager.start(backgroundProcessor);
+ if(queueManager != null) {
+ if (isDebugEnabled) {
+ logger.debug("starting queueManager");
+ }
+ queueManager.start(backgroundProcessor);
+ }
+ if (isDebugEnabled) {
+ logger.debug("scheduling pings every {} milliseconds", pingInterval);
+ }
+
+
+ if (this.statisticInterval > 0 && this.dsys.getConfig().getStatisticSamplingEnabled()) {
+ backgroundProcessor.scheduleWithFixedDelay(new PublishClientStatsTask(), statisticInterval, statisticInterval, TimeUnit.MILLISECONDS);
+ }
+ // LOG: changed from config to info
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.PoolImpl_POOL_0_STARTED_WITH_MULTIUSER_SECURE_MODE_ENABLED_1,
+ new Object[] {this.name, this.multiuserSecureModeEnabled}));
+ }
+
+ /**
+ * Returns the cancellation criterion for this proxy
+ * @return the cancellation criterion
+ */
+ public CancelCriterion getCancelCriterion() {
+ return this.cancelCriterion;
+ }
+
+ public void releaseThreadLocalConnection() {
+ executor.releaseThreadLocalConnection();
+ }
+
+ public void setupServerAffinity(boolean allowFailover) {
+ executor.setupServerAffinity(allowFailover);
+ }
+
+ public void releaseServerAffinity() {
+ executor.releaseServerAffinity();
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.Pool#getName()
+ */
+ public String getName() {
+ return this.name;
+ }
+ public int getFreeConnectionTimeout() {
+ return this.freeConnectionTimeout;
+ }
+ public int getLoadConditioningInterval() {
+ return this.loadConditioningInterval;
+ }
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+ public int getMinConnections() {
+ return minConnections;
+ }
+ public int getRetryAttempts() {
+ return retryAttempts;
+ }
+ public long getIdleTimeout() {
+ return idleTimeout;
+ }
+ public long getPingInterval() {
+ return pingInterval;
+ }
+ public int getStatisticInterval() {
+ return this.statisticInterval;
+ }
+ public int getSocketBufferSize() {
+ return this.socketBufferSize;
+ }
+ public boolean getThreadLocalConnections() {
+ return this.threadLocalConnections;
+ }
+ public int getReadTimeout() {
+ return this.readTimeout;
+ }
+ public boolean getSubscriptionEnabled() {
+ return this.subscriptionEnabled;
+ }
+
+ public boolean getPRSingleHopEnabled() {
+ return this.prSingleHopEnabled;
+ }
+
+ public int getSubscriptionRedundancy() {
+ return this.subscriptionRedundancyLevel;
+ }
+ public int getSubscriptionMessageTrackingTimeout() {
+ return this.subscriptionMessageTrackingTimeout;
+ }
+ public int getSubscriptionAckInterval() {
+ return subscriptionAckInterval;
+ }
+ public String getServerGroup() {
+ return this.serverGroup;
+ }
+
+ public boolean getMultiuserAuthentication() {
+ return this.multiuserSecureModeEnabled;
+ }
+
+ public List<InetSocketAddress> getLocators() {
+ return this.locators;
+ }
+ public List<InetSocketAddress> getServers() {
+ return this.servers;
+ }
+
+ public GatewaySender getGatewaySender() {
+ return gatewaySender;
+ }
+
+ public InternalLogWriter getSecurityInternalLogWriter() {
+ return this.securityLogWriter;
+ }
+
+ public void destroy() {
+ destroy(false);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(100);
+ sb.append(this.getClass().getSimpleName()).append('@')
+ .append(System.identityHashCode(this)).append(" name=")
+ .append(getName());
+ return sb.toString();
+ }
+
+ public boolean getKeepAlive(){
+ return this.keepAlive;
+ }
+
+ public void destroy(boolean keepAlive) {
+ int cnt = getAttachCount();
+ this.keepAlive = keepAlive;
+ boolean SPECIAL_DURABLE = Boolean.getBoolean("gemfire.SPECIAL_DURABLE");
+ if (cnt > 0) {
+ //special case to allow closing durable client pool under the keep alive flag
+ //closing regions prior to closing pool can cause them to unregister interest
+ if (SPECIAL_DURABLE) {
+ synchronized (simpleLock) {
+ try {
+ if (!CacheFactory.getAnyInstance().isClosed() && this.getPoolOrCacheCancelInProgress() == null) {
+ Set<Region<?, ?>> regions = CacheFactory.getInstance(dsys).rootRegions();
+ for (Region<?, ?> roots : regions) {
+ Set<Region<?, ?>> subregions = roots.subregions(true);
+ for (Region<?, ?> subroots : subregions) {
+ if (!subroots.isDestroyed() && subroots.getAttributes().getPoolName() != null
+ && subroots.getAttributes().getPoolName().equals(this.name) ) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PoolImpl.destroy[ Region connected count:{} Region subroot closing:{} Pool Name:{} ]", cnt, subroots.getName(), this.name);
+ }
+ subroots.close();
+ }
+ }
+
+ if (!roots.isDestroyed() && roots.getAttributes().getPoolName() != null
+ && roots.getAttributes().getPoolName().equals(this.name)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("PoolImpl.destroy[ Region connected count:{} Region root closing:{} Pool Name:{} ]", cnt, roots.getName(), this.name);
+ }
+ roots.close();
+ }
+ }
+ }
+ } catch (CacheClosedException ccex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(ccex.getMessage(), ccex);
+ }
+ } catch (Exception ex) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(ex.getMessage(), ex);
+ }
+ }
+ }
+ } //end special case
+
+ cnt = getAttachCount();
+ if (cnt > 0) {
+ throw new IllegalStateException( LocalizedStrings.PoolImpl_POOL_COULD_NOT_BE_DESTROYED_BECAUSE_IT_IS_STILL_IN_USE_BY_0_REGIONS.toLocalizedString(Integer.valueOf(cnt)));
+ }
+ }
+ if (this.pm.unregister(this)) {
+ basicDestroy(keepAlive);
+ }
+ }
+
+ /**
+ * Destroys this pool but does not unregister it.
+ * This is used by the PoolManagerImpl when it wants to close all its pools.
+ */
+ public synchronized void basicDestroy(boolean keepAlive) {
+ if (!isDestroyed()) {
+ this.destroyed = true;
+ // LOG: changed from config to info
+ logger.info(LocalizedMessage.create(LocalizedStrings.PoolImpl_DESTROYING_CONNECTION_POOL_0, name));
+
+ try {
+ if (backgroundProcessor != null) {
+ backgroundProcessor.shutdown();
+ if(!backgroundProcessor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.PoolImpl_TIMEOUT_WAITING_FOR_BACKGROUND_TASKS_TO_COMPLETE));
+ }
+ }
+ } catch(RuntimeException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.PoolImpl_ERROR_ENCOUNTERED_WHILE_STOPPING_BACKGROUNDPROCESSOR), e);
+ } catch(InterruptedException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.PoolImpl_INTERRUPTED_WHILE_STOPPING_BACKGROUNDPROCESSOR), e);
+ }
+
+ try {
+ if (this.source != null) {
+ this.source.stop();
+ }
+ } catch(RuntimeException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.PoolImpl_ERROR_ENCOUNTERED_WHILE_STOPPING_CONNECTION_SOURCE), e);
+ }
+
+ try {
+ if(this.manager != null) {
+ manager.close(keepAlive);
+ }
+ } catch(RuntimeException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.PoolImpl_ERROR_ENCOUNTERED_WHILE_STOPPING_CONNECTION_MANAGER), e);
+ }
+
+ try {
+ if(this.queueManager != null) {
+ queueManager.close(keepAlive);
+ }
+ } catch(RuntimeException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.PoolImpl_ERROR_ENCOUNTERED_WHILE_STOPPING_SUBSCRIPTION_MANAGER), e);
+ }
+
+ try {
+ endpointManager.close();
+ } catch(RuntimeException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.PoolImpl_ERROR_ENCOUNTERED_WHILE_STOPPING_ENDPOINT_MANAGER), e);
+ }
+
+ try {
+ if(this.stats!=null) {
+ this.stats.close();
+ }
+ } catch(RuntimeException e) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.PoolImpl_ERROR_WHILE_CLOSING_STATISTICS), e);
+ }
+ }
+ }
+
+ public boolean isDestroyed() {
+ return destroyed;
+ }
+
+
+ private ConnectionSource getSourceImpl(LocatorDiscoveryCallback locatorDiscoveryCallback) {
+ List<InetSocketAddress> locators = getLocators();
+ if (locators.isEmpty()) {
+ return new ExplicitConnectionSourceImpl(getServers());
+ }
+ else {
+ AutoConnectionSourceImpl source = new AutoConnectionSourceImpl(locators,
+ getServerGroup(), HANDSHAKE_TIMEOUT);
+ if(locatorDiscoveryCallback != null) {
+ source.setLocatorDiscoveryCallback(locatorDiscoveryCallback);
+ }
+ return source;
+ }
+ }
+ /**
+ * Used internally by xml parsing code.
+ */
+ public void sameAs(Object obj) {
+ if (!(obj instanceof PoolImpl)) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl__0_IS_NOT_THE_SAME_AS_1_BECAUSE_IT_SHOULD_HAVE_BEEN_A_POOLIMPL
+ .toLocalizedString(new Object[] {this, obj}));
+ }
+ PoolImpl other = (PoolImpl)obj;
+ if (!getName().equals(other.getName())) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_ARE_DIFFERENT.toLocalizedString("names"));
+ }
+ if (getFreeConnectionTimeout() != other.getFreeConnectionTimeout()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("connectionTimeout"));
+ }
+ if (getLoadConditioningInterval() != other.getLoadConditioningInterval()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("connectionLifetime"));
+ }
+ if (getSocketBufferSize() != other.getSocketBufferSize()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("socketBufferSize"));
+ }
+ if (getThreadLocalConnections() != other.getThreadLocalConnections()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("threadLocalConnections"));
+ }
+ if (getReadTimeout() != other.getReadTimeout()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("readTimeout"));
+ }
+ if (getMinConnections() != other.getMinConnections()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("MinConnections"));
+ }
+ if (getMaxConnections() != other.getMaxConnections()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("MaxConnections"));
+ }
+ if (getRetryAttempts() != other.getRetryAttempts()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("RetryAttempts"));
+ }
+ if (getIdleTimeout() != other.getIdleTimeout()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("IdleTimeout"));
+ }
+ if (getPingInterval() != other.getPingInterval()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("PingInterval"));
+ }
+ if (getStatisticInterval() != other.getStatisticInterval()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("StatisticInterval"));
+ }
+ if (getSubscriptionAckInterval() != other.getSubscriptionAckInterval()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("subscriptionAckInterval"));
+ }
+ if (getSubscriptionEnabled() != other.getSubscriptionEnabled()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("subscriptionEnabled"));
+ }
+ if (getSubscriptionMessageTrackingTimeout() != other.getSubscriptionMessageTrackingTimeout()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("subscriptionMessageTrackingTimeout"));
+ }
+ if (getSubscriptionRedundancy() != other.getSubscriptionRedundancy()) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("subscriptionRedundancyLevel"));
+ }
+ if (!getServerGroup().equals(other.getServerGroup())) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_IS_DIFFERENT.toLocalizedString("serverGroup"));
+ }
+ if (!getLocators().equals(other.getLocators())) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_ARE_DIFFERENT.toLocalizedString("locators"));
+ }
+ if (!getServers().equals(other.getServers())) {
+ throw new RuntimeException(
+ LocalizedStrings.PoolImpl_0_ARE_DIFFERENT.toLocalizedString("servers"));
+ }
+ // ignore startDisabled
+ }
+
+ public PoolStats getStats() {
+ return this.stats;
+ }
+
+
+ /**
+ * Execute the given op on the servers that this pool connects to.
+ * This method is responsible for retrying the op if an attempt fails.
+ * It will only execute it once and on one server.
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ * @since 5.7
+ */
+ public Object execute(Op op) {
+ //if(multiuser)
+ //get a server from threadlocal cache else throw cacheWriterException
+ //executeOn(ServerLocation server, Op op, boolean accessed,boolean onlyUseExistingCnx)
+
+ // Retries are ignored here. FIX IT - FIXED.
+ // But this may lead to a user getting authenticated on all servers, even if
+ // a single server could have serviced all its requests.
+ authenticateIfRequired(op);
+ return executor.execute(op);
+ }
+
+ /**
+ * Execute the given op on the servers that this pool connects to.
+ * This method is responsible for retrying the op if an attempt fails.
+ * It will only execute it once and on one server.
+ * @param op the operation to execute
+ * @param retries how many times to retry the operation
+ * @return the result of execution if any; null if not
+ * @since 5.7
+ */
+ public Object execute(Op op, int retries) {
+ authenticateIfRequired(op);
+ return executor.execute(op, retries);
+ }
+
+ /**
+ * Execute the given op on the given server.
+ * @param server the server to do the execution on
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOn(ServerLocation server, Op op) {
+ authenticateIfRequired(server, op);
+ return executor.executeOn(server, op);
+ }
+ /**
+ * Execute the given op on the given server.
+ * @param server the server to do the execution on
+ * @param op the operation to execute
+ * @param accessed true if the connection is accessed by this execute
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOn(ServerLocation server, Op op, boolean accessed,boolean onlyUseExistingCnx) {
+ authenticateIfRequired(server, op);
+ return executor.executeOn(server, op, accessed,onlyUseExistingCnx);
+ }
+
+ /**
+ * Execute the given op on the given connection.
+ * @param con the connection to do the execution on
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOn(Connection con, Op op) {
+ authenticateIfRequired(con.getServer(), op);
+ return executor.executeOn(con, op);
+ }
+
+ public Object executeOn(Connection con, Op op, boolean timeoutFatal) {
+ return executor.executeOn(con, op, timeoutFatal);
+ }
+
+ /**
+ * Execute the given op on all the servers that have server-to-client
+ * queues for this pool
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ * @since 5.7
+ */
+ public Object executeOnQueuesAndReturnPrimaryResult(Op op) {
+ authenticateOnAllServers(op);
+ return executor.executeOnQueuesAndReturnPrimaryResult(op);
+ }
+
+ public void executeOnAllQueueServers(Op op)
+ throws NoSubscriptionServersAvailableException, SubscriptionNotEnabledException {
+ authenticateOnAllServers(op);
+ executor.executeOnAllQueueServers(op);
+ }
+
+ /**
+ * Execute the given op on the current primary server.
+ * @param op the operation to execute
+ * @return the result of execution if any; null if not
+ */
+ public Object executeOnPrimary(Op op) {
+ return executor.executeOnPrimary(op);
+ }
+
+ public Map<ServerLocation, Endpoint> getEndpointMap() {
+ return endpointManager.getEndpointMap();
+ }
+
+ public ScheduledExecutorService getBackgroundProcessor() {
+ return backgroundProcessor;
+ }
+
+ public RegisterInterestTracker getRITracker() {
+ return this.riTracker;
+ }
+
+ /**
+ * Test hook that returns the number of servers we currently have connections to.
+ */
+ public int getConnectedServerCount() {
+ return this.endpointManager.getConnectedServerCount();
+ }
+ /**
+ * Test hook.
+ * Verify if this EventId is already present in the map or not. If it is
+ * already present then return true.
+ *
+ * @param eventId the EventId of the incoming event
+ * @return true if it is already present
+ * @since 5.1
+ */
+ public boolean verifyIfDuplicate(EventID eventId) {
+ return ((QueueStateImpl)this.queueManager.getState()).verifyIfDuplicate(eventId);
+ }
+
+ public boolean verifyIfDuplicate(EventID eventId, boolean addToMap) {
+ return ((QueueStateImpl)this.queueManager.getState()).verifyIfDuplicate(eventId);
+ }
+
+ /**
+ * Borrows a connection from the pool.. Used by gateway and tests.
+ * Any connection that is acquired using this method must be returned using
+ * returnConnection, even if it is destroyed.
+ *
+ * TODO - The use of the this method should be removed
+ * from the gateway code. This method is fine for tests,
+ * but these connections should really be managed inside
+ * the pool code. If the gateway needs to persistent connection
+ * to a single server, which should create have the OpExecutor
+ * that holds a reference to the connection (similar to the way
+ * we do with thread local connections).
+ * TODO use {@link ExecutablePool#setupServerAffinity(boolean)} for
+ * gateway code
+ */
+ public Connection acquireConnection() {
+ return manager.borrowConnection(45000L);
+ }
+
+ /**
+ * Hook to return connections that were acquired using
+ * acquireConnection.
+ * @param conn
+ */
+ public void returnConnection(Connection conn) {
+ manager.returnConnection(conn);
+ }
+
+ /**
+ * Test hook that acquires and returns a connection from the pool with a given ServerLocation.
+ */
+ public Connection acquireConnection(ServerLocation loc) {
+ return manager.borrowConnection(loc,15000L,false);
+ }
+
+ /**
+ * Test hook that returns an unnmodifiable list of the current blacklisted servers
+ */
+ public Set getBlacklistedServers() {
+ return connectionFactory.getBlackList().getBadServers();
+ }
+ /**
+ * Test hook to handle an exception that happened on the given connection
+ */
+ public void processException(Throwable e, Connection con) {
+ executor.handleException(e, con, 0, false);
+ }
+ /**
+ * Test hook that returns the ThreadIdToSequenceIdMap
+ */
+ public Map getThreadIdToSequenceIdMap() {
+ if (this.queueManager == null) return Collections.EMPTY_MAP;
+ if (this.queueManager.getState() == null) return Collections.EMPTY_MAP;
+ return this.queueManager.getState().getThreadIdToSequenceIdMap();
+ }
+
+ /**
+ * Test hook that returns true if we have a primary and its updater thread
+ * is alive.
+ */
+ public boolean isPrimaryUpdaterAlive() {
+ return ((QueueManagerImpl)this.queueManager).isPrimaryUpdaterAlive();
+ }
+ /**
+ * Test hook used to simulate a kill of the primaryEndpoint
+ */
+ public void killPrimaryEndpoint() //throws ServerException
+ {
+ boolean ok = false;
+ if (this.queueManager != null) {
+ QueueManager.QueueConnections cons = this.queueManager.getAllConnections();
+ Connection con = cons.getPrimary();
+ if (con != null) {
+ final String msg = "killing primary endpoint";
+ logger.info("<ExpectedException action=add>{}</ExpectedException>", msg);
+ Exception e = new Exception(msg);
+ try {
+ processException(e, con);
+ } catch (ServerConnectivityException expected) {
+ } finally {
+ logger.info("<ExpectedException action=remove>{}</ExpectedException>", msg);
+ }
+ // do some validation here that we are no longer connected to "sl"
+ ok = true;
+ }
+ }
+ if (!ok) {
+ throw new IllegalStateException("primaryEndpoint was null");
+ }
+ }
+
+ // Pool that are declared in a cache.xml will set this property to true.
+ private boolean declaredInXML;
+
+ public void setDeclaredInXML(boolean v) {
+ this.declaredInXML = v;
+ }
+ public boolean getDeclaredInXML() {
+ return this.declaredInXML;
+ }
+
+ // used by unit tests to confirm if readyForEvents has been called on a pool
+ private boolean readyForEventsCalled;
+
+ public boolean getReadyForEventsCalled() {
+ return this.readyForEventsCalled;
+ }
+
+ public void readyForEvents(InternalDistributedSystem system) {
+ if(!isDurableClient() || queueManager == null) {
+ return;
+ }
+ this.readyForEventsCalled = true;
+ queueManager.readyForEvents(system);
+
+ }
+
+ public boolean isDurableClient() {
+ boolean isDurable = false;
+ InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
+ DistributionConfig config = system.getConfig();
+ String durableClientId = config.getDurableClientId();
+ isDurable = durableClientId != null && durableClientId.length() > 0;
+ return isDurable;
+ }
+
+ /**
+ * Test hook that returns a string consisting of the host name and port of the primary server.
+ * Null is returned if we have no primary.
+ */
+ public String getPrimaryName() {
+ String result = null;
+ ServerLocation sl = getPrimary();
+ if (sl != null) {
+ result = sl.getHostName() + sl.getPort();
+ }
+ return result;
+ }
+ /**
+ * Test hook that returns an int which the port of the primary server.
+ * -1 is returned if we have no primary.
+ */
+ public int getPrimaryPort() {
+ int result = -1;
+ ServerLocation sl = getPrimary();
+ if (sl != null) {
+ result = sl.getPort();
+ }
+ return result;
+ }
+ /**
+ * Test hook that returns a string consisting of the host name and port of the primary server.
+ * Null is returned if we have no primary.
+ */
+ public ServerLocation getPrimary() {
+ ServerLocation result = null;
+ if (this.queueManager != null) {
+ QueueManager.QueueConnections cons = this.queueManager.getAllConnections();
+ Connection con = cons.getPrimary();
+ result = con.getServer();
+ }
+ return result;
+ }
+
+ /**
+ * Test hook to get a connection to the primary server.
+ */
+ public Connection getPrimaryConnection() {
+ if (this.queueManager != null) {
+ QueueManager.QueueConnections cons = this.queueManager.getAllConnections();
+ return cons.getPrimary();
+ }
+ return null;
+ }
+
+ /**
+ * Test hook that returns a list of strings. Each string consists of the host name and port of a redundant server.
+ * An empty list is returned if we have no redundant servers.
+ */
+ public List<String> getRedundantNames() {
+ List result = Collections.EMPTY_LIST;
+ if (this.queueManager != null) {
+ QueueManager.QueueConnections cons = this.queueManager.getAllConnections();
+ List<Connection> backupCons = cons.getBackups();
+ if (backupCons.size() > 0) {
+ result = new ArrayList(backupCons.size());
+ Iterator<Connection> it = backupCons.iterator();
+ while (it.hasNext()) {
+ Connection con = it.next();
+ ServerLocation sl = con.getServer();
+ result.add(sl.getHostName() + sl.getPort());
+ }
+ }
+ }
+ return result;
+ }
+ /**
+ * Test hook that returns a list of ServerLocation instances.
+ * Each ServerLocation describes a redundant server.
+ * An empty list is returned if we have no redundant servers.
+ */
+ public List<ServerLocation> getRedundants() {
+ List result = Collections.EMPTY_LIST;
+ if (this.queueManager != null) {
+ QueueManager.QueueConnections cons = this.queueManager.getAllConnections();
+ List<Connection> backupCons = cons.getBackups();
+ if (backupCons.size() > 0) {
+ result = new ArrayList(backupCons.size());
+ Iterator<Connection> it = backupCons.iterator();
+ while (it.hasNext()) {
+ Connection con = it.next();
+ result.add(con.getServer());
+ }
+ }
+ }
+ return result;
+ }
+ /**
+ * Test hook to find out current number of connections this pool has.
+ */
+ public int getConnectionCount() {
+ return manager.getConnectionCount();
+ }
+
+ /**
+ * Atomic counter used to keep track of services using this pool.
+ * @since 5.7
+ */
+ private final AtomicInteger attachCount = new AtomicInteger();
+ public static volatile boolean IS_INSTANTIATOR_CALLBACK = false ;
+
+ /**
+ * Returns number of services currently using/attached to this pool.
+ * <p>Made public so it can be used by tests
+ * @since 5.7
+ */
+ public int getAttachCount() {
+ return this.attachCount.get();
+ }
+ /**
+ * This needs to be called when a service (like a Region or CQService)
+ * starts using a pool.
+ * @since 5.7
+ */
+ public void attach() {
+ this.attachCount.getAndIncrement();
+ }
+ /**
+ * This needs to be called when a service (like a Region or CQService)
+ * stops using a pool.
+ * @since 5.7
+ */
+ public void detach() {
+ this.attachCount.getAndDecrement();
+ }
+
+ /**
+ * Get the connection held by this thread
+ * if we're using thread local connections
+ *
+ * This is a a hook for hydra code to pass
+ * thread local connections between threads.
+ * @return the connection from the thread local,
+ * or null if there is no thread local connection.
+ */
+ public Connection getThreadLocalConnection() {
+ return executor.getThreadLocalConnection();
+ }
+
+ /**
+ * Returns a list of ServerLocation instances;
+ * one for each server we are currently connected to.
+ */
+ public List<ServerLocation> getCurrentServers() {
+ ArrayList result = new ArrayList();
+ Map endpointMap = endpointManager.getEndpointMap();
+ result.addAll(endpointMap.keySet());
+ return result;
+ }
+ /**
+ * Test hook that returns a list of server names (host+port);
+ * one for each server we are currently connected to.
+ */
+ public List<String> getCurrentServerNames() {
+ List<ServerLocation> servers = getCurrentServers();
+ ArrayList<String> result = new ArrayList(servers.size());
+ Iterator it = servers.iterator();
+ while (it.hasNext()) {
+ ServerLocation sl = (ServerLocation)it.next();
+ String name = sl.getHostName() + sl.getPort();
+ result.add(name);
+ }
+ return result;
+ }
+
+ public EndpointManager getEndpointManager() {
+ return endpointManager;
+ }
+
+ /**
+ * Fetch the connection source for this pool
+ * @return the source
+ */
+ public ConnectionSource getConnectionSource() {
+ return source;
+ }
+
+ private static void setTEST_DURABLE_IS_NET_DOWN(boolean v) {
+ TEST_DURABLE_IS_NET_DOWN = v;
+ }
+
+ /**
+ * test hook
+ */
+ public void endpointsNetDownForDUnitTest() {
+ logger.debug("PoolImpl - endpointsNetDownForDUnitTest");
+ setTEST_DURABLE_IS_NET_DOWN(true);
+ try {
+ java.lang.Thread.sleep(this.pingInterval * 2);
+ }
+ catch (java.lang.InterruptedException ex) {
+ // do nothing.
+ }
+
+ Map endpoints = endpointManager.getEndpointMap();
+ for(Iterator itr = endpoints.values().iterator(); itr.hasNext();) {
+ Endpoint endpoint = (Endpoint) itr.next();
+ logger.debug("PoolImpl Simulating crash of endpoint {}", endpoint);
+ endpointManager.serverCrashed(endpoint);
+ }
+ }
+ /**
+ * test hook
+ */
+ public void endpointsNetUpForDUnitTest() {
+ setTEST_DURABLE_IS_NET_DOWN(false);
+ try {
+ java.lang.Thread.sleep(this.pingInterval * 2);
+ }
+ catch (java.lang.InterruptedException ex) {
+ // do nothing.
+ }
+ }
+ /**
+ * test hook
+ */
+ public int getInvalidateCount() {
+ return ((QueueStateImpl)this.queueManager.getState()).getInvalidateCount();
+ }
+ /**
+ * Set the connection held by this thread
+ * if we're using thread local connections
+ *
+ * This is a a hook for hydra code to pass
+ * thread local connections between threads.
+ */
+ public void setThreadLocalConnection(Connection conn) {
+ executor.setThreadLocalConnection(conn);
+ }
+
+ public ServerLocation getServerAffinityLocation() {
+ return executor.getServerAffinityLocation();
+ }
+
+ public void setServerAffinityLocation(ServerLocation serverLocation) {
+ executor.setServerAffinityLocation(serverLocation);
+ }
+
+ public ServerLocation getNextOpServerLocation() {
+ return executor.getNextOpServerLocation();
+ }
+
+ /**
+ * Test hook for getting the client proxy membership id from this proxy.
+ */
+ public ClientProxyMembershipID getProxyID() {
+ return proxyId;
+ }
+
+
+ public void emergencyClose() {
+ destroyed = true;
+ manager.emergencyClose();
+ queueManager.emergencyClose();
+ }
+
+ ///////////////////// start test hooks ///////////////////////
+ /**
+ * A debug flag used for testing used in BridgeObserver
+ */
+ public static volatile boolean AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false;
+
+ /**
+ * A debug flag used for testing used in BridgeObserver
+ */
+ public static volatile boolean BEFORE_REGISTER_CALLBACK_FLAG = false;
+
+ /**
+ * A debug flag used for testing used in BridgeObserver
+ */
+ public static volatile boolean BEFORE_RECOVER_INTERST_CALLBACK_FLAG = false;
+
+ /**
+ * A debug flag used for testing used in BridgeObserver
+ */
+ public static volatile boolean AFTER_REGISTER_CALLBACK_FLAG = false;
+
+ /**
+ * A debug flag used for testing used in BridgeObserver
+ */
+ public static volatile boolean BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false;
+
+ /**
+ * A debug flag used for testing used in BridgeObserver
+ */
+ public static volatile boolean BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG = false;
+ /**
+ * A debug flag used for testing used in BridgeObserver
+ */
+ public static volatile boolean AFTER_QUEUE_DESTROY_MESSAGE_FLAG = false;
+
+ /**
+ * Test hook flag to notify observer(s) that a primary is recovered
+ * either from a backup or from a new connection.
+ */
+ public static volatile boolean AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG = false;
+
+ public static abstract class PoolTask implements Runnable {
+
+ public final void run() {
+ try {
+ run2();
+ } catch(VirtualMachineError e) {
+ SystemFailure.initiateFailure(e);
+ throw e;
+ }
+ catch (CancelException e) {
+// throw e;
+ if (logger.isDebugEnabled()) {
+ logger.debug("Pool task <{}> cancelled", this, logger.isTraceEnabled() ? e : null);
+ }
+ } catch(Throwable t) {
+ logger.error(LocalizedMessage.create(LocalizedStrings.PoolImpl_UNEXPECTED_ERROR_IN_POOL_TASK_0, this), t);
+ }
+
+ }
+
+ public abstract void run2();
+ }
+
+ ///////////////////// end test hooks ///////////////////////
+
+ protected class PublishClientStatsTask extends PoolTask {
+ @Override
+ public void run2() {
+ ClientStatsManager.publishClientStats(PoolImpl.this);
+ }
+ }
+
+ /**
+ * A cancel criterion that checks both the pool and the cache
+ * for canceled status.
+ */
+ protected class PoolOrCacheStopper extends CancelCriterion {
+
+ @Override
+ public String cancelInProgress() {
+ return getPoolOrCacheCancelInProgress();
+ }
+
+ @Override
+ public RuntimeException generateCancelledException(Throwable e) {
+ return generatePoolOrCacheCancelledException(e);
+ }
+
+ }
+
+ /**
+ * A cancel criterion that checks only if this pool has been
+ * closed. This is necessary because there are some things that
+ * we want to allow even after the cache has started closing.
+ */
+ protected class Stopper extends CancelCriterion {
+
+ @Override
+ public String cancelInProgress() {
+ if(destroyed) {
+ return "Pool " + PoolImpl.this + " is shut down";
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public RuntimeException generateCancelledException(Throwable t) {
+ String reason = cancelInProgress();
+ if (reason == null) {
+ return null;
+ }
+ return new PoolCancelledException(reason, t);
+ }
+ }
+
+ public static void loadEmergencyClasses() {
+ QueueManagerImpl.loadEmergencyClasses();
+ ConnectionManagerImpl.loadEmergencyClasses();
+ EndpointManagerImpl.loadEmergencyClasses();
+ }
+
+ /**
+ * Returns the QueryService, that can be used to execute Query functions on
+ * the servers associated with this pool.
+ * @return the QueryService
+ */
+ public QueryService getQueryService() {
+ Cache cache = CacheFactory.getInstance(InternalDistributedSystem.getAnyInstance());
+ DefaultQueryService queryService = new DefaultQueryService((InternalCache) cache);
+ queryService.setPool(this);
+ return queryService;
+ }
+
+ public RegionService createAuthenticatedCacheView(Properties properties) {
+ if (!this.multiuserSecureModeEnabled) {
+ throw new UnsupportedOperationException(
+ "Operation not supported when multiuser-authentication is false.");
+ }
+ if (properties == null || properties.isEmpty()) {
+ throw new IllegalArgumentException("Security properties cannot be empty.");
+ }
+ Cache cache = CacheFactory.getInstance(InternalDistributedSystem
+ .getAnyInstance());
+
+ Properties props = new Properties();
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ props.setProperty((String)entry.getKey(), (String)entry.getValue());
+ }
+ ProxyCache proxy = new ProxyCache(props, (GemFireCacheImpl)cache, this);
+ synchronized (this.proxyCacheList) {
+ this.proxyCacheList.add(proxy);
+ }
+ return proxy;
+ }
+
+ private volatile CancelCriterion cacheCriterion = null;
+
+ private RuntimeException generatePoolOrCacheCancelledException(Throwable e) {
+ RuntimeException re = getCancelCriterion().generateCancelledException(e);
+ if(re != null) {
+ return re;
+ }
+ Cache cache = GemFireCacheImpl.getInstance();
+ if (cache == null) {
+ if (cacheCriterion != null) {
+ return cacheCriterion.generateCancelledException(e);
+ }
+ } else {
+ if (cacheCriterion == null) {
+ cacheCriterion = cache.getCancelCriterion();
+ } else if (cacheCriterion != cache.getCancelCriterion()) {
+ /*
+ * If the cache instance has somehow changed, we need to get a reference
+ * to the new criterion. This is pretty unlikely because the cache
+ * closes all the pools when it shuts down, but I wanted to be safe.
+ */
+ cacheCriterion = cache.getCancelCriterion();
+ }
+ return cacheCriterion.generateCancelledException(e);
+ }
+ return null;
+ }
+
+ public String getPoolOrCacheCancelInProgress() {
+ String reason = null;
+ try {
+ reason = getCancelCriterion().cancelInProgress();
+ if(reason!=null) {
+ return reason;
+ }
+ Cache cache = GemFireCacheImpl.getInstance();
+ if(cache==null) {
+ if(cacheCriterion!=null) {
+ return cacheCriterion.cancelInProgress();
+ }
+ return null;
+ } else {
+ if(cacheCriterion==null) {
+ cacheCriterion = cache.getCancelCriterion();
+ } else if(cacheCriterion!=cache.getCancelCriterion()) {
+ /*
+ If the cache instance has somehow changed, we need to
+ get a reference to the new criterion. This is pretty unlikely
+ because the cache closes all the pools when it shuts down,
+ but I wanted to be safe.
+ */
+ cacheCriterion = cache.getCancelCriterion();
+ }
+ return cacheCriterion.cancelInProgress();
+ }
+ } catch(CancelException cce) {
+ if(cce.getMessage()!=null) {
+ return cce.getMessage();
+ } else {
+ return "cache is closed";
+ }
+ }
+ }
+
+ public ArrayList<ProxyCache> getProxyCacheList() {
+ return this.proxyCacheList;
+ }
+
+ private void authenticateIfRequired(Op op) {
+ authenticateIfRequired(null, op);
+ }
+
+ /**
+ * Assert thread-local var is not null, if it has
+ * multiuser-authentication set to true.
+ *
+ * If serverLocation is non-null, check if the the user is authenticated on
+ * that server. If not, authenticate it and return.
+ *
+ * @param serverLocation
+ * @param op
+ */
+ private void authenticateIfRequired(ServerLocation serverLocation, Op op) {
+ if (this.multiuserSecureModeEnabled && op instanceof AbstractOp
+ && ((AbstractOp)op).needsUserId()) {
+ UserAttributes userAttributes = UserAttributes.userAttributes.get();
+ if (userAttributes == null) {
+ throw new UnsupportedOperationException(LocalizedStrings.MultiUserSecurityEnabled_USE_POOL_API.toLocalizedString());
+ }
+ if (serverLocation != null) {
+ if (!userAttributes.getServerToId().containsKey(serverLocation)) {
+ Long userId = (Long)AuthenticateUserOp.executeOn(serverLocation,
+ this, userAttributes.getCredentials());
+ if (userId != null) {
+ userAttributes.setServerToId(serverLocation, userId);
+ }
+ }
+ }
+ }
+ }
+
+ private void authenticateOnAllServers(Op op) {
+ if (this.multiuserSecureModeEnabled && ((AbstractOp)op).needsUserId()) {
+ UserAttributes userAttributes = UserAttributes.userAttributes.get();
+ if (userAttributes != null) {
+ ConcurrentHashMap<ServerLocation, Long> map = userAttributes
+ .getServerToId();
+
+ if (this.queueManager == null) {
+ throw new SubscriptionNotEnabledException();
+ }
+ Connection primary = this.queueManager.getAllConnectionsNoWait()
+ .getPrimary();
+ if (primary != null && !map.containsKey(primary.getServer())) {
+ Long userId = (Long)AuthenticateUserOp.executeOn(primary.getServer(),
+ this, userAttributes.getCredentials());
+ if (userId != null) {
+ map.put(primary.getServer(), userId);
+ }
+ }
+
+ List<Connection> backups = this.queueManager.getAllConnectionsNoWait().getBackups();
+ for (int i = 0; i < backups.size(); i++) {
+ Connection conn = backups.get(i);
+ if (!map.containsKey(conn.getServer())) {
+ Long userId = (Long)AuthenticateUserOp.executeOn(conn.getServer(),
+ this, userAttributes.getCredentials());
+ if (userId != null) {
+ map.put(conn.getServer(), userId);
+ }
+ }
+ }
+ } else {
+ throw new UnsupportedOperationException(LocalizedStrings.MultiUserSecurityEnabled_USE_POOL_API.toLocalizedString());
+ }
+ }
+ }
+
+ public void setPendingEventCount(int count) {
+ this.primaryQueueSize.set(count);
+ }
+
+ public int getPendingEventCount() {
+ if(!isDurableClient() || this.queueManager == null) {
+ throw new IllegalStateException(LocalizedStrings.PoolManagerImpl_ONLY_DURABLE_CLIENTS_SHOULD_CALL_GETPENDINGEVENTCOUNT.toLocalizedString());
+ }
+ if (this.readyForEventsCalled) {
+ throw new IllegalStateException(LocalizedStrings.PoolManagerImpl_GETPENDINGEVENTCOUNT_SHOULD_BE_CALLED_BEFORE_INVOKING_READYFOREVENTS.toLocalizedString());
+ }
+ return this.primaryQueueSize.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
new file mode 100644
index 0000000..48c9e0e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/PrimaryAckOp.java
@@ -0,0 +1,92 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Send the primary server acknowledgement on the events this client
+ * has received and processed from it.
+ * @author darrel
+ * @since 5.7
+ */
+public class PrimaryAckOp {
+ /**
+ * Send the primary server acknowledgement on the events this client
+ * has received and processed from it
+ * using connections from the given pool
+ * to communicate with the server.
+ * @param connection
+ * @param pool the pool to use to communicate with the server.
+ * @param events list of events to acknowledge
+ */
+ public static void execute(Connection connection, ExecutablePool pool,
+ List events)
+ {
+ AbstractOp op = new PrimaryAckOpImpl(events);
+ pool.executeOn(connection, op);
+ }
+
+ private PrimaryAckOp() {
+ // no instances allowed
+ }
+
+ private static class PrimaryAckOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public PrimaryAckOpImpl(List events) {
+ super(MessageType.PERIODIC_ACK, events.size());
+ for (Iterator i = events.iterator(); i.hasNext();) {
+ getMessage().addObjPart(i.next());
+ }
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "primaryAck");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startPrimaryAck();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endPrimaryAckSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endPrimaryAck(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCache.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCache.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCache.java
new file mode 100755
index 0000000..1fb9cf0
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCache.java
@@ -0,0 +1,239 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionService;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.internal.ProxyQueryService;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.pdx.PdxInstance;
+import com.gemstone.gemfire.pdx.PdxInstanceFactory;
+import com.gemstone.gemfire.pdx.internal.PdxInstanceFactoryImpl;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * A wrapper class over an actual Cache instance. This is used when the
+ * multiuser-authentication attribute is set to true. Application must use
+ * its {@link #getRegion(String)} API instead that of actual Cache instance for
+ * getting a reference to Region instances, to perform operations on server.
+ *
+ * TODO Avoid creating multiple instances of ProxyCache for a single user.
+ *
+ * @see ClientCache#createAuthenticatedView(Properties)
+ * @see ProxyQueryService
+ * @see ProxyRegion
+ * @since 6.5
+ */
+public class ProxyCache implements RegionService {
+
+ private final GemFireCacheImpl cache;
+ private UserAttributes userAttributes;
+ private ProxyQueryService proxyQueryService;
+ private boolean isClosed = false;
+ private final Stopper stopper = new Stopper();
+
+ public ProxyCache(Properties properties, GemFireCacheImpl cache, PoolImpl pool) {
+ this.userAttributes = new UserAttributes(properties, pool);
+ this.cache = cache;
+ }
+
+ public void close() {
+ close(false);
+ }
+
+ public void close(boolean keepAlive) {
+ if (this.isClosed) {
+ return;
+ }
+ // It should go to all the servers it has authenticated itself on and ask
+ // them to clean-up its entry from their auth-data structures.
+ try {
+ if (this.proxyQueryService != null) {
+ this.proxyQueryService.closeCqs(keepAlive);
+ }
+ UserAttributes.userAttributes.set(this.getUserAttributes());
+ Iterator<ServerLocation> iter = this.userAttributes.getServerToId()
+ .keySet().iterator();
+ while (iter.hasNext()) {
+ ProxyCacheCloseOp.executeOn(iter.next(), (PoolImpl)this.userAttributes.getPool(),
+ getProperties(), keepAlive);
+ }
+ ArrayList<ProxyCache> proxyCache = ((PoolImpl)this.userAttributes.getPool()).getProxyCacheList();
+ synchronized (proxyCache) {
+ proxyCache.remove(this);
+ }
+ } finally {
+ // @todo I think some NPE will be caused by this code.
+ // It would be safer to not null things out.
+ // It is really bad that we null out and then set isClosed true.
+ this.isClosed = true;
+ this.proxyQueryService = null;
+ this.userAttributes.setCredentials(null);
+ this.userAttributes = null;
+ UserAttributes.userAttributes.set(null);
+ }
+ }
+
+ // TODO remove this method
+ public String getName() {
+ return this.cache.getName();
+ }
+
+ public QueryService getQueryService() {
+ preOp();
+ if (this.proxyQueryService == null) {
+ this.proxyQueryService = new ProxyQueryService(this, userAttributes
+ .getPool().getQueryService());
+ }
+ return this.proxyQueryService;
+ }
+
+ public <K, V> Region<K, V> getRegion(String path) {
+ preOp();
+ // TODO Auto-generated method stub
+ // ProxyRegion region = this.proxyRegionList.get(path);
+ // if (region != null) {
+ // return region;
+ // }
+ // else {
+ if (this.cache.getRegion(path) == null) {
+ return null;
+ } else {
+ if (!this.cache.getRegion(path).getAttributes().getDataPolicy().isEmpty()) {
+ throw new IllegalStateException(
+ "Region's data-policy must be EMPTY when multiuser-authentication is true");
+ }
+ return new ProxyRegion(this, this.cache.getRegion(path));
+ }
+ // }
+ }
+
+ public boolean isClosed() {
+ return this.isClosed;
+ }
+
+ public void setProperties(Properties properties) {
+ preOp();
+ this.userAttributes.setCredentials(properties);
+ }
+
+ public Properties getProperties() {
+ preOp();
+ return this.userAttributes.getCredentials();
+ }
+
+ public void setUserAttributes(UserAttributes userAttributes) {
+ preOp();
+ this.userAttributes = userAttributes;
+ }
+
+ public UserAttributes getUserAttributes() {
+ preOp();
+ return this.userAttributes;
+ }
+
+ public Object getUserId(Object key) {
+ preOp();
+ if (!(key instanceof ServerLocation)) {
+ throw new IllegalArgumentException(
+ "Key must be of type ServerLocation, but is " + key.getClass());
+ }
+ return this.userAttributes.getServerToId().get(key);
+ }
+
+ private void preOp() {
+ this.stopper.checkCancelInProgress(null);
+ }
+
+ protected class Stopper extends CancelCriterion {
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.CancelCriterion#cancelInProgress()
+ */
+ @Override
+ public String cancelInProgress() {
+ String reason = cache.getCancelCriterion().cancelInProgress();
+ if (reason != null) {
+ return reason;
+ }
+ if (isClosed()) {
+ return "Authenticated cache view is closed for this user.";
+ }
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.CancelCriterion#generateCancelledException(java.lang.Throwable)
+ */
+ @Override
+ public RuntimeException generateCancelledException(Throwable e) {
+ String reason = cancelInProgress();
+ if (reason == null) {
+ return null;
+ }
+ RuntimeException result = cache.getCancelCriterion().generateCancelledException(e);
+ if (result != null) {
+ return result;
+ }
+ if (e == null) {
+ // Caller did not specify any root cause, so just use our own.
+ return new CacheClosedException(reason);
+ }
+
+ try {
+ return new CacheClosedException(reason, e);
+ }
+ catch (IllegalStateException e2) {
+ // Bug 39496 (Jrockit related) Give up. The following
+ // error is not entirely sane but gives the correct general picture.
+ return new CacheClosedException(reason);
+ }
+ }
+ }
+
+ public CancelCriterion getCancelCriterion() {
+ return this.stopper;
+ }
+
+ /* (non-Javadoc)
+ * @see com.gemstone.gemfire.cache.RegionService#rootRegions()
+ */
+ public Set<Region<?, ?>> rootRegions() {
+ preOp();
+ Set<Region<?, ?>> rRegions = new HashSet<Region<?,?>>();
+ Iterator<LocalRegion> it = this.cache.rootRegions().iterator();
+ while (it.hasNext()) {
+ LocalRegion lr = it.next();
+ if (!lr.getAttributes().getDataPolicy().withStorage()) {
+ rRegions.add(new ProxyRegion(this, lr));
+ }
+ }
+ return Collections.unmodifiableSet(rRegions);
+ }
+
+ public PdxInstanceFactory createPdxInstanceFactory(String className) {
+ return PdxInstanceFactoryImpl.newCreator(className, true);
+ }
+ public PdxInstanceFactory createPdxInstanceFactory(String className, boolean b) {
+ return PdxInstanceFactoryImpl.newCreator(className, b);
+ }
+ public PdxInstance createPdxEnum(String className, String enumName, int enumOrdinal) {
+ return PdxInstanceFactoryImpl.createPdxEnum(className, enumName, enumOrdinal, this.cache);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
new file mode 100644
index 0000000..5271b73
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ProxyCacheCloseOp.java
@@ -0,0 +1,115 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.Properties;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.HeapDataOutputStream;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+
+public class ProxyCacheCloseOp {
+
+ public static Object executeOn(ServerLocation location, ExecutablePool pool,
+ Properties securityProps, boolean keepAlive) {
+ AbstractOp op = new ProxyCacheCloseOpImpl(pool, securityProps, keepAlive);
+ return pool.executeOn(location, op);
+ }
+
+ private ProxyCacheCloseOp() {
+ // no instances allowed
+ }
+
+ static class ProxyCacheCloseOpImpl extends AbstractOp {
+
+ public ProxyCacheCloseOpImpl(ExecutablePool pool, Properties securityProps,
+ boolean keepAlive) {
+ super(MessageType.REMOVE_USER_AUTH, 1);
+ getMessage().setEarlyAck(Message.MESSAGE_HAS_SECURE_PART);
+ getMessage().addBytesPart(keepAlive ? new byte[] {1} : new byte[] {0});
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ HeapDataOutputStream hdos = new HeapDataOutputStream(Version.CURRENT);
+ byte[] secureBytes = null;
+ hdos.writeLong(cnx.getConnectionID());
+ Object userId = UserAttributes.userAttributes.get().getServerToId().get(cnx.getServer());
+ if (userId == null) {
+ // This will ensure that this op is retried on another server, unless
+ // the retryCount is exhausted. Fix for Bug 41501
+ throw new ServerConnectivityException(
+ "Connection error while authenticating user");
+ }
+ hdos.writeLong((Long)userId);
+ try {
+ secureBytes = ((ConnectionImpl)cnx).getHandShake().encryptBytes(
+ hdos.toByteArray());
+ } finally {
+ hdos.close();
+ }
+ getMessage().setSecurePart(secureBytes);
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ Part part = msg.getPart(0);
+ final int msgType = msg.getMessageType();
+ if (msgType == MessageType.REPLY) {
+ return part.getObject();
+ }
+ else if (msgType == MessageType.EXCEPTION) {
+ String s = "While performing a remote proxy cache close";
+ throw new ServerOperationException(s, (Throwable)part.getObject());
+ // Get the exception toString part.
+ // This was added for c++ thin client and not used in java
+ // Part exceptionToStringPart = msg.getPart(1);
+ }
+ else if (isErrorResponse(msgType)) {
+ throw new ServerOperationException(part.getString());
+ }
+ else {
+ throw new InternalGemFireError("Unexpected message type "
+ + MessageType.getString(msgType));
+ }
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return msgType == MessageType.REQUESTDATAERROR;
+ }
+
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startGet();
+ }
+
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endGetSend(start, hasFailed());
+ }
+
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endGet(start, hasTimedOut(), hasFailed());
+ }
+ }
+
+}