You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:11 UTC
[06/51] [partial] incubator-geode git commit: Init
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManagerImpl.java
new file mode 100644
index 0000000..71357e3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManagerImpl.java
@@ -0,0 +1,1478 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.GemFireException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.NoSubscriptionServersAvailableException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker.RegionInterestEntry;
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.BlackListListener;
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.BlackListListenerAdapter;
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.FailureTracker;
+import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.ClientCQ;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientUpdater;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+import com.gemstone.org.jgroups.util.StringId;
+
+/**
+ * Manages Client Queues. Responsible for creating callback connections and
+ * satisfying redundancy requirements.
+ *
+ * @author dsmith
+ * @since 5.7
+ *
+ */
+public class QueueManagerImpl implements QueueManager {
+ private static final Logger logger = LogService.getLogger();
+
+// private static final long SERVER_LOCATION_TIMEOUT = Long.getLong(
+// "gemfire.QueueManagerImpl.SERVER_LOCATION_TIMEOUT", 120000).longValue();
+ private static final Comparator QSIZE_COMPARATOR = new QSizeComparator();
+
+ protected final long redundancyRetryInterval;
+ private final EndpointManager endpointManager;
+ private final EndpointManager.EndpointListenerAdapter endpointListener;
+ private final ConnectionSource source;
+ private final int redundancyLevel;
+ protected final ConnectionFactory factory;
+ private final InternalLogWriter securityLogger;
+ private final ClientProxyMembershipID proxyId;
+ protected final InternalPool pool;
+ private final QueueStateImpl state;
+ private boolean printPrimaryNotFoundError;
+ private boolean printRedundancyNotSatisfiedError;
+ private boolean printRecoveringPrimary;
+ private boolean printRecoveringRedundant;
+ protected final ServerBlackList blackList;
+ // Lock which guards updates to queueConnections.
+ // Also threads calling getAllConnections will wait on this
+ // lock until there is a primary.
+ protected final Object lock = new Object();
+
+ protected final CountDownLatch initializedLatch = new CountDownLatch(1);
+
+ private ScheduledThreadPoolExecutor recoveryThread;
+ private volatile boolean sentClientReady;
+
+ // queueConnections in maintained by using copy-on-write
+ protected volatile ConnectionList queueConnections = new ConnectionList();
+ protected volatile RedundancySatisfierTask redundancySatisfierTask = null;
+ private volatile boolean shuttingDown;
+
+ public QueueManagerImpl(
+ InternalPool pool,
+ EndpointManager endpointManager,
+ ConnectionSource source,
+ ConnectionFactory factory,
+ int queueRedundancyLevel,
+ long redundancyRetryInterval,
+ InternalLogWriter securityLogger,
+ ClientProxyMembershipID proxyId) {
+ this.printPrimaryNotFoundError = true;
+ this.printRedundancyNotSatisfiedError = true;
+ this.printRecoveringRedundant = true;
+ this.printRecoveringPrimary = true;
+ this.pool = pool;
+ this.endpointManager = endpointManager;
+ this.source = source;
+ this.factory = factory;
+ this.redundancyLevel = queueRedundancyLevel;
+ this.securityLogger = securityLogger;
+ this.proxyId = proxyId;
+ this.redundancyRetryInterval = redundancyRetryInterval;
+ blackList = new ServerBlackList(redundancyRetryInterval);
+
+
+ this.endpointListener = new EndpointManager.EndpointListenerAdapter() {
+ @Override
+ public void endpointCrashed(Endpoint endpoint) {
+ QueueManagerImpl.this.endpointCrashed(endpoint);
+ }
+ };
+
+ this.state = new QueueStateImpl(this);
+ }
+
+ public InternalPool getPool() {
+ return pool;
+ }
+
+ public boolean isPrimaryUpdaterAlive() {
+ boolean result = false;
+ QueueConnectionImpl primary = (QueueConnectionImpl)
+ queueConnections.getPrimary();
+ if (primary != null) {
+ ClientUpdater cu = primary.getUpdater();
+ if (cu != null) {
+ result = ((CacheClientUpdater)cu).isAlive();
+ }
+ }
+ return result;
+ }
+
+ public QueueConnections getAllConnectionsNoWait() {
+ return queueConnections;
+ }
+
+ public QueueConnections getAllConnections() {
+
+ ConnectionList snapshot = queueConnections;
+ if (snapshot.getPrimary() == null) {
+ // wait for a new primary to become available.
+ synchronized (lock) {
+ snapshot = queueConnections;
+ while (snapshot.getPrimary() == null
+ && !snapshot.primaryDiscoveryFailed() && !shuttingDown && pool.getPoolOrCacheCancelInProgress()==null) {
+ try {
+ lock.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ snapshot = queueConnections;
+ }
+ }
+ }
+
+ if (snapshot.getPrimary() == null) {
+ pool.getCancelCriterion().checkCancelInProgress(null);
+ GemFireException exception = snapshot.getPrimaryDiscoveryException();
+ if(exception == null || exception instanceof NoSubscriptionServersAvailableException) {
+ exception = new NoSubscriptionServersAvailableException(exception);
+ }
+ else {
+ exception = new ServerConnectivityException(exception.getMessage(), exception);
+ }
+ throw exception;
+ }
+
+ return snapshot;
+ }
+
+ public InternalLogWriter getSecurityLogger() {
+ return securityLogger;
+ }
+
+ public void close(boolean keepAlive) {
+ endpointManager.removeListener(endpointListener);
+ synchronized (lock) {
+ shuttingDown = true;
+ if (redundancySatisfierTask != null) {
+ redundancySatisfierTask.cancel();
+ }
+ lock.notifyAll();
+ }
+ if (recoveryThread != null) {
+ // it will be null if we never called start
+ recoveryThread.shutdown();
+ }
+ if (recoveryThread != null) {
+ try {
+ if(!recoveryThread.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_TIMEOUT_WAITING_FOR_RECOVERY_THREAD_TO_COMPLETE));
+ }
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ logger.debug("Interrupted waiting for recovery thread termination");
+ }
+ }
+
+ QueueConnectionImpl primary = (QueueConnectionImpl) queueConnections
+ .getPrimary();
+ if(logger.isDebugEnabled()) {
+ logger.debug("QueueManagerImpl - closing connections with keepAlive={}", keepAlive);
+ }
+ if (primary != null) {
+ try {
+ if(logger.isDebugEnabled()) {
+ logger.debug("QueueManagerImpl - closing primary {}", primary);
+ }
+ primary.internalClose(keepAlive);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_ERROR_CLOSING_PRIMARY_CONNECTION_TO_0,
+ primary.getEndpoint()), e);
+ }
+ }
+
+ List backups = queueConnections.getBackups();
+ for (Iterator itr = backups.iterator(); itr.hasNext();) {
+ QueueConnectionImpl backup = (QueueConnectionImpl) itr.next();
+ if (backup != null) {
+ try {
+ if(logger.isDebugEnabled()) {
+ logger.debug("QueueManagerImpl - closing backup {}", backup);
+ }
+ backup.internalClose(keepAlive);
+ } catch (Exception e) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_ERROR_CLOSING_BACKUP_CONNECTION_TO_0,
+ backup.getEndpoint()), e);
+ }
+ }
+ }
+ }
+
+
+ public void emergencyClose() {
+ shuttingDown = true;
+ queueConnections.getPrimary().emergencyClose();
+ List backups = queueConnections.getBackups();
+ for(int i = 0; i < backups.size(); i++) {
+ Connection backup = (Connection) backups.get(i);
+ backup.emergencyClose();
+ }
+ }
+
+ public void start(ScheduledExecutorService background) {
+ try {
+ blackList.start(background);
+ endpointManager.addListener(endpointListener);
+
+ // Use a separate timer for queue management tasks
+ // We don't want primary recovery (and therefore user threads) to wait for
+ // things like pinging connections for health checks.
+ // this.background = background;
+ final String name = "queueTimer-" + this.pool.getName();
+ this.recoveryThread = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
+
+ public Thread newThread(Runnable r) {
+ Thread result = new Thread(r, name);
+ result.setDaemon(true);
+ return result;
+ }
+
+
+ });
+ recoveryThread.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+
+// TODO - use yet another Timer or the like for these tasks? We know
+ //we don't want them in the recoveryThread, because the ThreadIdToSequenceIdExpiryTask
+ //will wait for primary recovery.
+ getState().start(background, getPool().getSubscriptionAckInterval());
+
+ // initialize connections
+ initializeConnections();
+
+ scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
+
+ //When a server is removed from the blacklist, try again
+ //to establish redundancy (if we need to)
+ BlackListListener blackListListener = new BlackListListenerAdapter() {
+ @Override
+ public void serverRemoved(ServerLocation location) {
+ QueueManagerImpl.this.scheduleRedundancySatisfierIfNeeded(0);
+ }
+ };
+
+ blackList.addListener(blackListListener);
+ factory.getBlackList().addListener(blackListListener);
+ } finally {
+ initializedLatch.countDown();
+ }
+ }
+
+
+
+ public void readyForEvents(InternalDistributedSystem system) {
+ synchronized(lock) {
+ this.sentClientReady = true;
+ }
+
+ QueueConnectionImpl primary = null;
+ while (primary == null) {
+ try {
+ primary = (QueueConnectionImpl) getAllConnections().getPrimary();
+ } catch(NoSubscriptionServersAvailableException e) {
+ primary = null;
+ break;
+ }
+ if(primary.sendClientReady()) {
+ try {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_SENDING_READY_FOR_EVENTS_TO_PRIMARY_0,
+ primary));
+ ReadyForEventsOp.execute(pool, primary);
+ } catch(Exception e) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Error sending ready for events to {}", primary, e);
+ }
+ primary.destroy();
+ primary = null;
+ }
+ }
+ }
+ }
+
+ public void readyForEventsAfterFailover(QueueConnectionImpl primary) {
+ try {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_SENDING_READY_FOR_EVENTS_TO_PRIMARY_0, primary));
+ ReadyForEventsOp.execute(pool, primary);
+ } catch(Exception e) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Error sending ready for events to {}", primary, e);
+ }
+ primary.destroy();
+ }
+ }
+
+ void connectionCrashed(Connection con) {
+ // the endpoint has not crashed but this method does all the work
+ // we need to do
+ endpointCrashed(con.getEndpoint());
+ }
+
+ void endpointCrashed(Endpoint endpoint) {
+ QueueConnectionImpl deadConnection = null;
+ //We must be synchronized while checking to see if we have a queue connection for the endpoint,
+ //because when we need to prevent a race between adding a queue connection to the map
+ //and the endpoint for that connection crashing.
+ synchronized (lock) {
+ deadConnection = queueConnections.getConnection(endpoint);
+ if (deadConnection != null) {
+ queueConnections = queueConnections.removeConnection(deadConnection);
+ }
+ }
+ if (deadConnection != null) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_ENDPOINT_CRASHED_SCHEDULING_RECOVERY,
+ new Object[]{deadConnection.getUpdater() != null ?(deadConnection.getUpdater().isPrimary()? "Primary" : "Redundant") : "Queue", endpoint}));
+ scheduleRedundancySatisfierIfNeeded(0);
+ deadConnection.internalDestroy();
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Ignoring crashed endpoint {} it does not have a queue.", endpoint);
+ }
+ }
+ }
+ /**
+ * This method checks whether queue connection exist on this endpoint or not.
+ * if its there then it just destroys connection as clientUpdate thread is not there to read that connection.
+ */
+ public void checkEndpoint(ClientUpdater ccu, Endpoint endpoint)
+ {
+ QueueConnectionImpl deadConnection = null;
+
+ synchronized (lock) {
+ if(shuttingDown)
+ return;
+ //if same client updater then only remove as we don't know whether it has created new updater/connection on same endpoint or not..
+ deadConnection = queueConnections.getConnection(endpoint);
+ if (deadConnection != null && ccu.equals(deadConnection.getUpdater())) {
+ queueConnections = queueConnections.removeConnection(deadConnection);
+ deadConnection.internalDestroy();
+ }
+ }
+
+ logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_CACHE_CLIENT_UPDATER_FOR_ON_ENDPOINT_EXITING_SCHEDULING_RECOVERY,
+ new Object[]{(deadConnection != null && deadConnection.getUpdater() != null)?(deadConnection.getUpdater().isPrimary()? "Primary" : "Redundant"): "Queue", endpoint}));
+ scheduleRedundancySatisfierIfNeeded(0);//one more chance
+ }
+
+ private void initializeConnections() {
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager - intitializing connections");
+ }
+
+ int queuesNeeded = redundancyLevel == -1 ? -1 : redundancyLevel + 1;
+ Set excludedServers = new HashSet(blackList.getBadServers());
+ List servers = findQueueServers(excludedServers, queuesNeeded, true, false, null);
+
+ if (servers == null || servers.isEmpty()) {
+ logger.warn(
+ LocalizedStrings.QueueManagerImpl_COULD_NOT_CREATE_A_QUEUE_NO_QUEUE_SERVERS_AVAILABLE);
+ scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
+ synchronized (lock) {
+ queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
+ lock.notifyAll();
+ }
+ return;
+ }
+
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager - discovered subscription servers {}", servers);
+ }
+
+ SortedMap/* <ServerQueueStatus,Connection> */oldQueueServers = new TreeMap(
+ QSIZE_COMPARATOR);
+ List nonRedundantServers = new ArrayList();
+
+ for (Iterator itr = servers.iterator(); itr.hasNext();) {
+ ServerLocation server = (ServerLocation) itr.next();
+ Connection connection = null;
+ try {
+ connection = factory.createClientToServerConnection(server, true);
+ } catch(GemFireSecurityException e) {
+ throw e;
+ } catch (Exception e) {
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager - Error connected to server: {}", server, e);
+ }
+ }
+ if (connection != null) {
+ ServerQueueStatus status = connection.getQueueStatus();
+ if (status.isRedundant() || status.isPrimary()) {
+ oldQueueServers.put(status, connection);
+ } else {
+ nonRedundantServers.add(connection);
+ }
+ }
+ }
+
+ // This ordering was determined from the old ConnectionProxyImpl code
+ //
+ // initialization order of the new redundant and primary server is
+ // old redundant w/ second largest queue
+ // old redundant w/ third largest queue
+ // ...
+ // old primary
+ // non redundants in no particular order
+ //
+ // The primary is then chosen as
+ // redundant with the largest queue
+ // primary if there are no redundants
+ // a non redundant
+
+ // if the redundant with the largest queue fails, then we go and
+ // make a new server a primary.
+
+ Connection newPrimary = null;
+ if (!oldQueueServers.isEmpty()) {
+ newPrimary = (Connection) oldQueueServers.remove(oldQueueServers
+ .lastKey());
+ } else if (!nonRedundantServers.isEmpty()) {
+ newPrimary = (Connection) nonRedundantServers.remove(0);
+ }
+
+ nonRedundantServers.addAll(0, oldQueueServers.values());
+
+ for (Iterator itr = nonRedundantServers.iterator(); itr.hasNext();) {
+ Connection connection = (Connection) itr.next();
+ QueueConnectionImpl queueConnection = initializeQueueConnection(
+ connection, false, null);
+ if (queueConnection != null) {
+ addToConnectionList(queueConnection, false);
+ }
+ }
+
+ QueueConnectionImpl primaryQueue = null;
+ if (newPrimary != null) {
+ primaryQueue = initializeQueueConnection(newPrimary, true, null);
+ if (primaryQueue == null) {
+ newPrimary.destroy();
+ } else {
+ if(!addToConnectionList(primaryQueue, true)) {
+ primaryQueue = null;
+ }
+ }
+ }
+
+
+ excludedServers.addAll(servers);
+
+ // Make sure we have enough redundant copies. Some of the connections may
+ // have failed
+ // above.
+ if (redundancyLevel != -1 && getCurrentRedundancy() < redundancyLevel) {
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager - Some initial connections failed. Trying to create redundant queues");
+ }
+ recoverRedundancy(excludedServers, false);
+ }
+
+ if (redundancyLevel != -1 && primaryQueue == null) {
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager - Intial primary creation failed. Trying to create a new primary");
+ }
+ while(primaryQueue == null) {
+ primaryQueue = createNewPrimary(excludedServers);
+ if(primaryQueue == null) {
+ //couldn't find a server to make primary
+ break;
+ }
+ if(!addToConnectionList(primaryQueue, true)) {
+ excludedServers.add(primaryQueue.getServer());
+ primaryQueue = null;
+ }
+ }
+ }
+
+ if (primaryQueue == null) {
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager - Unable to create a new primary queue, using one of the redundant queues");
+ }
+ while(primaryQueue == null) {
+ primaryQueue = promoteBackupToPrimary(queueConnections.getBackups());
+ if(primaryQueue == null) {
+ //no backup servers available
+ break;
+ }
+ if(!addToConnectionList(primaryQueue, true)) {
+ synchronized(lock) {
+ //make sure we don't retry this same connection.
+ queueConnections = queueConnections.removeConnection(primaryQueue);
+ }
+ primaryQueue = null;
+ }
+ }
+ }
+
+ if (primaryQueue == null) {
+ logger.error(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_COULD_NOT_INITIALIZE_A_PRIMARY_QUEUE_ON_STARTUP_NO_QUEUE_SERVERS_AVAILABLE));
+ synchronized (lock) {
+ queueConnections = queueConnections.setPrimaryDiscoveryFailed(
+ new NoSubscriptionServersAvailableException(LocalizedStrings.QueueManagerImpl_COULD_NOT_INITIALIZE_A_PRIMARY_QUEUE_ON_STARTUP_NO_QUEUE_SERVERS_AVAILABLE.toLocalizedString()));
+ lock.notifyAll();
+ }
+ cqsDisconnected();
+ }
+ else {
+ cqsConnected();
+ }
+
+ if (getCurrentRedundancy() < redundancyLevel) {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_UNABLE_TO_INITIALIZE_ENOUGH_REDUNDANT_QUEUES_ON_STARTUP_THE_REDUNDANCY_COUNT_IS_CURRENTLY_0,
+ getCurrentRedundancy()));
+ }
+ }
+
+ private void cqsConnected() {
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ if(cache != null) {
+ CqService cqService = cache.getCqService();
+ //Primary queue was found, alert the affected cqs if necessary
+ cqService.cqsConnected(pool);
+ }
+ }
+
+ private void cqsDisconnected() {
+ //No primary queue was found, alert the affected cqs if necessary
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ if(cache != null) {
+ CqService cqService = cache.getCqService();
+ cqService.cqsDisconnected(pool);
+ }
+ }
+
+ private int getCurrentRedundancy() {
+ return queueConnections.getBackups().size();
+ }
+
+ /**
+ * Make sure that we have enough backup servers.
+ *
+ * Add any servers we fail to connect to to the excluded servers list.
+ */
+ protected boolean recoverRedundancy(Set excludedServers, boolean recoverInterest) {
+ if(pool.getPoolOrCacheCancelInProgress() != null) {
+ return true;
+ }
+ int additionalBackups;
+ while (pool.getPoolOrCacheCancelInProgress()==null && ((additionalBackups = redundancyLevel - getCurrentRedundancy()) > 0
+ || redundancyLevel == -1)) {
+
+
+ if(redundancyLevel != -1 && printRecoveringRedundant) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_MANAGER_REDUNDANCY_SATISFIER_REDUNDANT_ENDPOINT_HAS_BEEN_LOST_ATTEMPTIMG_TO_RECOVER));
+ printRecoveringRedundant = false;
+ }
+
+ List servers = findQueueServers(excludedServers, redundancyLevel == -1 ? -1 : additionalBackups, false,
+ (redundancyLevel == -1 ? false : printRedundancyNotSatisfiedError),
+ LocalizedStrings.QueueManagerImpl_COULD_NOT_FIND_SERVER_TO_CREATE_REDUNDANT_CLIENT_QUEUE);
+
+ if (servers == null || servers.isEmpty()) {
+ if (redundancyLevel != -1) {
+
+ if(printRedundancyNotSatisfiedError) {
+ logger.info(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_REDUNDANCY_LEVEL_0_IS_NOT_SATISFIED_BUT_THERE_ARE_NO_MORE_SERVERS_AVAILABLE_REDUNDANCY_IS_CURRENTLY_1,
+ new Object[] { Integer.valueOf(redundancyLevel), Integer.valueOf(getCurrentRedundancy())}));
+ }
+ }
+ printRedundancyNotSatisfiedError = false;//printed above
+ return false;
+ }
+ excludedServers.addAll(servers);
+
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ for (Iterator itr = servers.iterator(); itr.hasNext();) {
+ ServerLocation server = (ServerLocation) itr.next();
+ Connection connection = null;
+ try {
+ connection = factory.createClientToServerConnection(server, true);
+ } catch(GemFireSecurityException e) {
+ throw e;
+ } catch (Exception e) {
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager - Error connecting to server: ()", server, e);
+ }
+ }
+ if (connection == null) {
+ continue;
+ }
+
+ QueueConnectionImpl queueConnection = initializeQueueConnection(
+ connection, false, null);
+ if (queueConnection != null) {
+ boolean isFirstNewConnection = false;
+ synchronized (lock) {
+ if (recoverInterest && queueConnections.getPrimary() == null
+ && queueConnections.getBackups().isEmpty()) {
+ // we lost our queue at some point. We Need to recover
+ // interest. This server will be made primary after this method
+ // finishes
+ // because whoever killed the primary when this method started
+ // should
+ // have scheduled a task to recover the primary.
+ isFirstNewConnection = true;
+ // TODO - Actually, we need a better check than the above. There's
+ // still a chance
+ // that we haven't realized that the primary has died but it is
+ // already gone. We should
+ // get some information from the queue server about whether it was
+ // able to copy the
+ // queue from another server and decide if we need to recover our
+ // interest based on
+ // that information.
+ }
+ }
+ boolean promotionFailed = false;
+ if (isFirstNewConnection) {
+ if (!promoteBackupCnxToPrimary(queueConnection)) {
+ promotionFailed = true;
+ }
+ }
+ if (!promotionFailed) {
+ if (addToConnectionList(queueConnection, isFirstNewConnection)) {
+ //redundancy satisfied
+ printRedundancyNotSatisfiedError = true;
+ printRecoveringRedundant = true;
+ if (logger.isDebugEnabled()) {
+ logger.debug("SubscriptionManager redundancy satisfier - created a queue on server {}", queueConnection.getEndpoint());
+ }
+ // Even though the new redundant queue will usually recover
+ // subscription information (see bug #39014) from its initial
+ // image provider, in bug #42280 we found that this is not always
+ // the case, so clients must always register interest with the new
+ // redundant server.
+ if(recoverInterest) {
+ recoverInterest(queueConnection, isFirstNewConnection);
+ }
+ }
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ private QueueConnectionImpl promoteBackupToPrimary(List backups) {
+ QueueConnectionImpl primary = null;
+ for (int i = 0; primary == null && i < backups.size(); i++) {
+ QueueConnectionImpl lastConnection = (QueueConnectionImpl) backups.get(i);
+ if (promoteBackupCnxToPrimary(lastConnection)) {
+ primary = lastConnection;
+ }
+ }
+ return primary;
+ }
+
+ private boolean promoteBackupCnxToPrimary(QueueConnectionImpl cnx) {
+ boolean result = false;
+ if (PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.beforePrimaryIdentificationFromBackup();
+ }
+ try {
+ boolean haveSentClientReady = this.sentClientReady;
+ if(haveSentClientReady) {
+ cnx.sendClientReady();
+ }
+ ClientUpdater updater = cnx.getUpdater();
+ if(updater == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("backup connection was destroyed before it could become the primary.");
+ }
+ Assert.assertTrue(cnx.isDestroyed());
+ } else {
+ updater.setFailedUpdater(queueConnections.getFailedUpdater());
+ MakePrimaryOp.execute(pool, cnx, haveSentClientReady);
+ result = true;
+ if (PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.afterPrimaryIdentificationFromBackup(cnx.getServer());
+ }
+ }
+ } catch (Exception e) {
+ if(pool.getPoolOrCacheCancelInProgress() == null && logger.isDebugEnabled()) {
+ logger.debug("Error making a backup server the primary server for client subscriptions", e);
+ }
+ }
+ return result;
+ }
+ /**
+ * Create a new primary server from a non-redundant server.
+ *
+ * Add any failed servers to the excludedServers set.
+ */
+ private QueueConnectionImpl createNewPrimary(Set excludedServers) {
+ QueueConnectionImpl primary = null;
+ while (primary == null && pool.getPoolOrCacheCancelInProgress()==null) {
+ List servers = findQueueServers(excludedServers, 1, false,
+ printPrimaryNotFoundError,
+ LocalizedStrings.QueueManagerImpl_COULD_NOT_FIND_SERVER_TO_CREATE_PRIMARY_CLIENT_QUEUE);
+ printPrimaryNotFoundError = false; //printed above
+ if (servers == null || servers.isEmpty()) {
+ break;
+ }
+
+ Connection connection = null;
+ try {
+ connection = factory
+ .createClientToServerConnection((ServerLocation) servers.get(0), true);
+ } catch (GemFireSecurityException e) {
+ throw e;
+ } catch(Exception e) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("SubscriptionManagerImpl - error creating a connection to server {}", servers.get(0));
+ }
+ }
+ if (connection != null) {
+ primary = initializeQueueConnection(connection, true, queueConnections.getFailedUpdater());
+ }
+ excludedServers.addAll(servers);
+ }
+
+ if(primary != null && sentClientReady && primary.sendClientReady()) {
+ readyForEventsAfterFailover(primary);
+ }
+ return primary;
+ }
+
+ private List findQueueServers(Set excludedServers, int count,
+ boolean findDurable, boolean printErrorMessage, StringId msgId) {
+ List servers = null;
+ Exception ex = null;
+ try {
+ if(pool.getPoolOrCacheCancelInProgress()!=null) {
+ return null;
+ }
+ servers = source.findServersForQueue(excludedServers, count, proxyId, findDurable);
+ } catch(GemFireSecurityException e) {
+ //propagate the security exception immediately.
+ throw e;
+ } catch (Exception e) {
+ /*logger
+ .warning(
+ LocalizedStrings.QueueManagerImpl_COULD_NOT_RETRIEVE_LIST_OF_SERVERS_FOR_SUBSCRIPTION_0,
+ new Object[] { e.getMessage() });*/
+ ex = e;
+ if (logger.isDebugEnabled()) {
+ logger.debug("SubscriptionManager - Error getting the list of servers: {}", e);
+ }
+ }
+
+ if(printErrorMessage)
+ {
+ if(servers == null || servers.isEmpty())
+ {
+ logger.error(LocalizedMessage.create(msgId,
+ new Object[]{ (excludedServers!= null?excludedServers.size(): 0), (ex != null?ex.getMessage(): "no exception")}));
+ }
+ }
+ return servers;
+ }
+
+ /**
+ * Find a new primary, adding any failed servers we encounter to the excluded
+ * servers list
+ *
+ * First we try to make a backup server the primary, but if run out of backup
+ * servers we will try to find a new server.
+ */
+ protected void recoverPrimary(Set excludedServers) {
+ if(pool.getPoolOrCacheCancelInProgress() != null) {
+ return;
+ }
+ final boolean isDebugEnabled = logger.isDebugEnabled();
+ if (queueConnections.getPrimary() != null) {
+ if (isDebugEnabled) {
+ logger.debug("Primary recovery not needed");
+ }
+ return;
+ }
+
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager redundancy satisfier - primary endpoint has been lost. Attempting to recover");
+ }
+
+ if(printRecoveringPrimary) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_MANAGER_REDUNDANCY_SATISFIER_PRIMARY_ENDPOINT_HAS_BEEN_LOST_ATTEMPTIMG_TO_RECOVER));
+ printRecoveringPrimary = false;
+ }
+
+ QueueConnectionImpl newPrimary = null;
+ while(newPrimary == null && pool.getPoolOrCacheCancelInProgress()==null) {
+ List backups = queueConnections.getBackups();
+ newPrimary = promoteBackupToPrimary(backups);
+ //Hitesh now lets say that server crashed
+ if(newPrimary == null) {
+ //could not find a backup to promote
+ break;
+ }
+ if(!addToConnectionList(newPrimary, true)) {
+ synchronized(lock) {
+ //make sure we don't retry the same backup server
+ queueConnections = queueConnections.removeConnection(newPrimary);
+ }
+ newPrimary = null;
+ }
+
+ }
+
+ if(newPrimary != null) {
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager redundancy satisfier - Switched backup server to primary: {}", newPrimary.getEndpoint());
+ }
+ if (PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.afterPrimaryRecovered(newPrimary.getServer());
+ }
+
+ //new primary from back up server was found, alert affected cqs if necessary
+ cqsConnected();
+ printPrimaryNotFoundError = true;
+ printRecoveringPrimary =true;
+ return;
+ }
+
+ while(newPrimary == null) {
+ newPrimary = createNewPrimary(excludedServers);
+ if(newPrimary == null) {
+ //could not find a new primary to create
+ break;
+ }
+ if(!addToConnectionList(newPrimary, true)) {
+ excludedServers.add(newPrimary.getServer());
+ newPrimary = null;
+ }
+
+ if (newPrimary != null) {
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager redundancy satisfier - Non backup server was made primary. Recovering interest {}", newPrimary.getEndpoint());
+ }
+
+ if(!recoverInterest(newPrimary, true)) {
+ excludedServers.add(newPrimary.getServer());
+ newPrimary = null;
+ }
+ //New primary queue was found from a non backup, alert the affected cqs
+ cqsConnected();
+ }
+
+ if (newPrimary != null && PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.afterPrimaryRecovered(newPrimary.getServer());
+ }
+ printPrimaryNotFoundError = true;
+ printRecoveringPrimary = true;
+ return;
+ }
+ //No primary queue was found, alert the affected cqs
+ cqsDisconnected();
+ if (isDebugEnabled) {
+ logger.debug("SubscriptionManager redundancy satisfier - Could not recover a new primary");
+ }
+ synchronized (lock) {
+ queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
+ lock.notifyAll();
+ }
+ }
+
+ private QueueConnectionImpl initializeQueueConnection(Connection connection,
+ boolean isPrimary, ClientUpdater failedUpdater) {
+ QueueConnectionImpl queueConnection = null;
+ FailureTracker failureTracker = blackList.getFailureTracker(connection.getServer());
+ try {
+ ClientUpdater updater = factory.createServerToClientConnection(connection
+ .getEndpoint(), this, isPrimary, failedUpdater);
+ if (updater != null) {
+ queueConnection = new QueueConnectionImpl(this, connection, updater, failureTracker);
+ } else {
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_UNABLE_TO_CREATE_A_SUBSCRIPTION_CONNECTION_TO_SERVER_0,
+ connection.getEndpoint()));
+ }
+ } catch (Exception e) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("error creating subscription connection to server {}", connection.getEndpoint(), e);
+ }
+ }
+ if (queueConnection == null) {
+ failureTracker.addFailure();
+ connection.destroy();
+ }
+ return queueConnection;
+ }
+ //need flag whether primary is created from backup
+ // for backuup queue lets assume before we add connection, endpoint crashed, now we put in connection but CCU may died as endpoint closed....
+ // so before putting connection need to see if something(crash) happen we should be able to recover from it
+ private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
+ boolean isBadConnection;
+ synchronized(lock) {
+ ClientUpdater cu = connection.getUpdater();
+ if(cu == null || (! cu.isAlive() ) || (!cu.isProcessing()) )
+ return false;//don't add
+ //now still CCU can died but then it will execute Checkendpoint with lock it will remove connection connection and it will reschedule it.
+ if(connection.getEndpoint().isClosed() || shuttingDown || pool.getPoolOrCacheCancelInProgress()!=null) {
+ isBadConnection = true;
+ } else {
+ isBadConnection = false;
+ if(isPrimary) {
+ queueConnections = queueConnections.setPrimary(connection);
+ lock.notifyAll();
+ } else {
+ queueConnections = queueConnections.addBackup(connection);
+ }
+ }
+ }
+
+ if(isBadConnection) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("Endpoint {} crashed while creating a connection. The connection will be destroyed", connection.getEndpoint());
+ }
+ connection.internalDestroy();
+ }
+
+ return !isBadConnection;
+ }
+
+ protected void scheduleRedundancySatisfierIfNeeded(long delay) {
+ if(shuttingDown) {
+ return;
+ }
+
+ synchronized (lock) {
+ if(shuttingDown) {
+ return;
+ }
+ if (queueConnections.getPrimary() == null
+ || getCurrentRedundancy() < redundancyLevel || redundancyLevel == -1
+ || queueConnections.primaryDiscoveryFailed()) {
+ if (redundancySatisfierTask != null) {
+ if (redundancySatisfierTask.getRemainingDelay() > delay) {
+ redundancySatisfierTask.cancel();
+ } else {
+ return;
+ }
+ }
+
+ redundancySatisfierTask = new RedundancySatisfierTask();
+ try {
+ ScheduledFuture future = recoveryThread.schedule(redundancySatisfierTask,
+ delay, TimeUnit.MILLISECONDS);
+ redundancySatisfierTask.setFuture(future);
+ } catch(RejectedExecutionException e) {
+ //ignore, the timer has been cancelled, which means we're shutting down.
+ }
+ }
+ }
+ }
+
+
+ private boolean recoverInterest(final QueueConnectionImpl newConnection,
+ final boolean isFirstNewConnection) {
+
+ if(pool.getPoolOrCacheCancelInProgress() != null) {
+ return true;
+ }
+
+ // recover interest
+ try {
+ recoverAllInterestTypes(newConnection, isFirstNewConnection);
+ newConnection.getFailureTracker().reset();
+ return true;
+ }
+ catch (CancelException ignore) {
+ return true;
+ // ok to ignore we are being shutdown
+ } catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ } catch (Throwable t) {
+ SystemFailure.checkFailure();
+ pool.getCancelCriterion().checkCancelInProgress(t);
+ logger.warn(LocalizedMessage.create(
+ LocalizedStrings.QueueManagerImpl_QUEUEMANAGERIMPL_FAILED_TO_RECOVER_INTEREST_TO_SERVER_0,
+ newConnection.getServer()), t);
+ newConnection.getFailureTracker().addFailure();
+ newConnection.destroy();
+ return false;
+ }
+ }
+
+ public QueueState getState() {
+ return this.state;
+ }
+
+ private void recoverSingleList(int interestType, Connection recoveredConnection,
+ boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
+ Iterator i = this.getPool().getRITracker()
+ .getRegionToInterestsMap(interestType, isDurable, !receiveValues).values().iterator();
+ while (i.hasNext()) { // restore a region
+ RegionInterestEntry e = (RegionInterestEntry) i.next();
+ recoverSingleRegion(e.getRegion(), e.getInterests(), interestType,
+ recoveredConnection, isDurable, receiveValues, isFirstNewConnection);
+ } // restore a region
+ }
+
+ private void recoverCqs(Connection recoveredConnection, boolean isDurable) {
+ Map cqs = this.getPool().getRITracker().getCqsMap();
+ Iterator i = cqs.entrySet().iterator();
+ while(i.hasNext()) {
+ Map.Entry e = (Map.Entry)i.next();
+ ClientCQ cqi = (ClientCQ)e.getKey();
+ String name = cqi.getName();
+ if (this.pool.getMultiuserAuthentication()) {
+ UserAttributes.userAttributes.set(((DefaultQueryService)this.pool
+ .getQueryService()).getUserAttributes(name));
+ }
+ try {
+ if (((CqStateImpl)cqi.getState()).getState() != CqStateImpl.INIT) {
+ cqi.createOn(recoveredConnection, isDurable);
+ }
+ } finally {
+ UserAttributes.userAttributes.set(null);
+ }
+ }
+ }
+
+ // TODO this is distressingly similar to LocalRegion#processSingleInterest
+ private void recoverSingleRegion(LocalRegion r, Map keys, int interestType,
+ Connection recoveredConnection, boolean isDurable,
+ boolean receiveValues, boolean isFirstNewConnection) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}.recoverSingleRegion starting kind={} region={}: {}", this, InterestType.getString(interestType), r.getFullPath(), keys);
+ }
+
+ // build a HashMap, key is policy, value is list
+ HashMap policyMap = new HashMap();
+ Iterator keysIter = keys.entrySet().iterator();
+ while (keysIter.hasNext()) { // restore and commit an interest
+ Map.Entry me = (Map.Entry) keysIter.next();
+ Object key = me.getKey();
+ InterestResultPolicy pol = (InterestResultPolicy) me.getValue();
+
+ if (interestType == InterestType.KEY) {
+ // Gester: we only consolidate the key into list for InterestType.KEY
+ LinkedList keyList = (LinkedList)policyMap.get(pol);
+ if (keyList == null) {
+
+ keyList = new LinkedList();
+ }
+ keyList.add(key);
+ policyMap.put(pol, keyList);
+ } else {
+ // for other Interest type, do it one by one
+ recoverSingleKey(r, key, pol, interestType, recoveredConnection,
+ isDurable, receiveValues, isFirstNewConnection);
+ }
+ }
+
+ // Process InterestType.KEY: Iterator list for each each policy
+ Iterator polIter = policyMap.entrySet().iterator();
+ while (polIter.hasNext()) {
+ Map.Entry me = (Map.Entry) polIter.next();
+ LinkedList keyList = (LinkedList)me.getValue();
+ InterestResultPolicy pol = (InterestResultPolicy)me.getKey();
+ recoverSingleKey(r, keyList, pol, interestType, recoveredConnection,
+ isDurable, receiveValues, isFirstNewConnection);
+ }
+ }
+
+ private void recoverSingleKey(LocalRegion r, Object keys,
+ InterestResultPolicy policy, int interestType, Connection recoveredConnection,
+ boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
+ r.startRegisterInterest();
+ try {
+ // Remove all matching values from local cache
+ if (isFirstNewConnection) { // only if this recoveredEP
+ // becomes primaryEndpoint
+ r.clearKeysOfInterest(keys, interestType, policy);
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}.recoverSingleRegion :Endpoint recovered is primary so clearing the keys of interest starting kind={} region={}: {}", this, InterestType.getString(interestType), r.getFullPath(), keys);
+ }
+ }
+ // Register interest, get new values back
+ List serverKeys;
+ if (policy != InterestResultPolicy.KEYS_VALUES) {
+ serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
+ keys, interestType, policy, isDurable, !receiveValues,
+ r.getAttributes().getDataPolicy().ordinal);
+ // Restore keys based on server's response
+ if (isFirstNewConnection) {
+ // only if this recoveredEP becomes primaryEndpoint
+ r.refreshEntriesFromServerKeys(recoveredConnection, serverKeys, policy);
+ }
+ } else {
+ if (!isFirstNewConnection) {
+ // InterestResultPolicy.KEYS_VALUES now fetches values in
+ // RegisterInterestOp's response itself and in this case
+ // refreshEntriesFromServerKeys(...) does not explicitly fetch values
+ // but only updates keys-values to region. To not fetch values, we
+ // need to use policy NONE or KEYS.
+ serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
+ keys, interestType, InterestResultPolicy.NONE, isDurable, !receiveValues,
+ r.getAttributes().getDataPolicy().ordinal);
+ } else {
+ serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
+ keys, interestType, policy, isDurable, !receiveValues,
+ r.getAttributes().getDataPolicy().ordinal);
+ r.refreshEntriesFromServerKeys(recoveredConnection, serverKeys, policy);
+ }
+ }
+ } finally {
+ r.finishRegisterInterest();
+ }
+ }
+
+ private void recoverInterestList(final Connection recoveredConnection,
+ boolean durable, boolean receiveValues, boolean isFirstNewConnection) {
+ recoverSingleList(InterestType.KEY, recoveredConnection, durable, receiveValues, isFirstNewConnection);
+ recoverSingleList(InterestType.REGULAR_EXPRESSION, recoveredConnection, durable, receiveValues, isFirstNewConnection);
+ recoverSingleList(InterestType.FILTER_CLASS, recoveredConnection, durable, receiveValues, isFirstNewConnection);
+ recoverSingleList(InterestType.OQL_QUERY, recoveredConnection, durable, receiveValues, isFirstNewConnection);
+ // VJR: Recover CQs moved to recoverAllInterestTypes() to avoid multiple
+ // calls for receiveValues flag being true and false.
+ //recoverCqs(recoveredConnection, durable);
+ //recoverSingleList(InterestType.CQ, recoveredConnection, durable,isFirstNewConnection);
+ }
+
+ protected void recoverAllInterestTypes(final Connection recoveredConnection,
+ boolean isFirstNewConnection) {
+ if (PoolImpl.BEFORE_RECOVER_INTERST_CALLBACK_FLAG) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.beforeInterestRecovery();
+ }
+ recoverInterestList(recoveredConnection, false, true, isFirstNewConnection);
+ recoverInterestList(recoveredConnection, false, false, isFirstNewConnection);
+ recoverCqs(recoveredConnection, false);
+ if ( getPool().isDurableClient()) {
+ recoverInterestList(recoveredConnection, true, true, isFirstNewConnection);
+ recoverInterestList(recoveredConnection, true, false, isFirstNewConnection);
+ recoverCqs(recoveredConnection, true);
+ }
+ }
+
+
+ /**
+ * A comparator which sorts queue elements in the order of primary first
+ * redundant with smallest queue size ... redundant with largest queue size
+ *
+ * @author dsmith
+ *
+ */
+ protected static class QSizeComparator implements java.util.Comparator {
+ public int compare(Object o1, Object o2) {
+ ServerQueueStatus s1 = (ServerQueueStatus) o1;
+ ServerQueueStatus s2 = (ServerQueueStatus) o2;
+ // sort primaries to the front of the list
+ if (s1.isPrimary() && !s2.isPrimary()) {
+ return -1;
+ } else if (!s1.isPrimary() && s2.isPrimary()) {
+ return 1;
+ } else {
+ int diff = s1.getServerQueueSize() - s2.getServerQueueSize();
+ if(diff != 0) {
+ return diff;
+ } else {
+ return s1.getMemberId().compareTo(s2.getMemberId());
+ }
+ }
+ }
+ }
+
+ /**
+ * A data structure for holding the current set of connections the
+ * queueConnections reference should be maintained by making a copy of this
+ * data structure for each change.
+ *
+ * Note the the order of the backups is significant. The first backup in the
+ * list is the first server that will be become primary after the primary
+ * fails, etc.
+ *
+ * The order of backups in this list is the reverse of the order or endpoints
+ * from the old ConnectionProxyImpl .
+ */
+ public class ConnectionList implements QueueConnections {
+ private final QueueConnectionImpl primary;
+ private final Map/* <Endpoint, QueueConnection> */connectionMap;
+ private final List/* <QueueConnection> */backups;
+ /**
+ * The primaryDiscoveryException flag is stronger than just not having any
+ * queue connections It also means we tried all of the possible queue
+ * servers and we'ren't able to connect.
+ */
+ private final GemFireException primaryDiscoveryException;
+ private final QueueConnectionImpl failedPrimary;
+
+ public ConnectionList() {
+ primary = null;
+ connectionMap = Collections.EMPTY_MAP;
+ backups = Collections.EMPTY_LIST;
+ primaryDiscoveryException = null;
+ failedPrimary = null;
+ }
+
+ private ConnectionList(QueueConnectionImpl primary, List backups,
+ GemFireException discoveryException, QueueConnectionImpl failedPrimary) {
+ this.primary = primary;
+ Map allConnectionsTmp = new HashMap();
+ for (Iterator itr = backups.iterator(); itr.hasNext();) {
+ QueueConnectionImpl nextConnection = (QueueConnectionImpl) itr.next();
+ allConnectionsTmp.put(nextConnection.getEndpoint(), nextConnection);
+ }
+ if (primary != null) {
+ allConnectionsTmp.put(primary.getEndpoint(), primary);
+ }
+ this.connectionMap = Collections.unmodifiableMap(allConnectionsTmp);
+ this.backups = Collections.unmodifiableList(new ArrayList(backups));
+ pool.getStats().setSubscriptionCount(connectionMap.size());
+ this.primaryDiscoveryException = discoveryException;
+ this.failedPrimary = failedPrimary;
+ }
+
+ public ConnectionList setPrimary(QueueConnectionImpl newPrimary) {
+ List newBackups = backups;
+ if (backups.contains(newPrimary)) {
+ newBackups = new ArrayList(backups);
+ newBackups.remove(newPrimary);
+ }
+ return new ConnectionList(newPrimary, newBackups, null, null);
+ }
+
+ public ConnectionList setPrimaryDiscoveryFailed(
+ GemFireException p_discoveryException) {
+ GemFireException discoveryException = p_discoveryException;
+ if(discoveryException == null) {
+ discoveryException = new NoSubscriptionServersAvailableException("Primary discovery failed.");
+ }
+ return new ConnectionList(primary, backups, discoveryException, failedPrimary);
+ }
+
+ public ConnectionList addBackup(QueueConnectionImpl queueConnection) {
+ ArrayList newBackups = new ArrayList(backups);
+ newBackups.add(queueConnection);
+ return new ConnectionList(primary, newBackups, primaryDiscoveryException, failedPrimary);
+ }
+
+ public ConnectionList removeConnection(QueueConnectionImpl connection) {
+ if (primary == connection) {
+ return new ConnectionList(null, backups, primaryDiscoveryException, primary);
+ } else {
+ ArrayList newBackups = new ArrayList(backups);
+ newBackups.remove(connection);
+ return new ConnectionList(primary, newBackups, primaryDiscoveryException, failedPrimary);
+ }
+ }
+
+ public Connection getPrimary() {
+ return primary;
+ }
+
+ public List/* <QueueConnection> */getBackups() {
+ return backups;
+ }
+
+ /**
+ * Return the cache client updater from the previously
+ * failed primary
+ * @return the previous updater or null if there is no previous updater
+ */
+ public ClientUpdater getFailedUpdater() {
+ if(failedPrimary != null) {
+ return failedPrimary.getUpdater();
+ } else {
+ return null;
+ }
+ }
+
+ public boolean primaryDiscoveryFailed() {
+ return primaryDiscoveryException != null;
+ }
+
+ public GemFireException getPrimaryDiscoveryException() {
+ return primaryDiscoveryException;
+ }
+
+ public QueueConnectionImpl getConnection(Endpoint endpoint) {
+ return (QueueConnectionImpl) connectionMap.get(endpoint);
+ }
+
+ /** return a copy of the list of all server locations */
+ public Set/* <ServerLocation> */getAllLocations() {
+ HashSet locations = new HashSet();
+ for (Iterator itr = connectionMap.keySet().iterator(); itr.hasNext();) {
+ com.gemstone.gemfire.cache.client.internal.Endpoint endpoint = (com.gemstone.gemfire.cache.client.internal.Endpoint) itr.next();
+ locations.add(endpoint.getLocation());
+ }
+
+ return locations;
+ }
+ }
+
+ protected void logError(StringId message, Throwable t) {
+ if(t instanceof GemFireSecurityException) {
+ securityLogger.error(message, t);
+ } else {
+ logger.error(message, t);
+ }
+ }
+
+ /**
+ * Asynchronous task which tries to restablish a primary connection and
+ * satisfy redundant requirements.
+ *
+ * This task should only be running in a single thread at a time. This task is
+ * the only way that new queue servers will be added, and the only way that a
+ * backup server can transistion to a primary server.
+ *
+ */
+ protected class RedundancySatisfierTask extends PoolTask {
+ private boolean isCancelled;
+ private ScheduledFuture future;
+
+ public void setFuture(ScheduledFuture future) {
+ this.future = future;
+ }
+
+ public long getRemainingDelay() {
+ return future.getDelay(TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void run2() {
+ try {
+ initializedLatch.await();
+ synchronized (lock) {
+ if (isCancelled) {
+ return;
+ } else {
+ redundancySatisfierTask = null;
+ }
+ if(pool.getPoolOrCacheCancelInProgress()!=null) {
+ /* wake up waiters so they can detect cancel */
+ lock.notifyAll();
+ return;
+ }
+ }
+ Set excludedServers = queueConnections.getAllLocations();
+ excludedServers.addAll(blackList.getBadServers());
+ excludedServers.addAll(factory.getBlackList().getBadServers());
+ recoverPrimary(excludedServers);
+ recoverRedundancy(excludedServers, true);
+ } catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (CancelException e) {
+ throw e;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ synchronized (lock) {
+ if(t instanceof GemFireSecurityException) {
+ queueConnections = queueConnections.setPrimaryDiscoveryFailed((GemFireSecurityException) t);
+ } else {
+ queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
+ }
+ lock.notifyAll();
+ pool.getCancelCriterion().checkCancelInProgress(t);
+ logError(LocalizedStrings.QueueManagerImpl_ERROR_IN_REDUNDANCY_SATISFIER, t);
+ }
+ }
+
+ scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
+ }
+
+ public boolean cancel() {
+ synchronized (lock) {
+ if(isCancelled) {
+ return false;
+ }
+ isCancelled = true;
+ future.cancel(false);
+ redundancySatisfierTask = null;
+ return true;
+ }
+ }
+
+ }
+
+ public static void loadEmergencyClasses() {
+ QueueConnectionImpl.loadEmergencyClasses();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueState.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueState.java
new file mode 100755
index 0000000..e4df40a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueState.java
@@ -0,0 +1,17 @@
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.ScheduledExecutorService;
+import com.gemstone.gemfire.internal.cache.EventID;
+
+public interface QueueState {
+
+ public void processMarker();
+ public boolean getProcessedMarker();
+ public void incrementInvalidatedStats();
+ public boolean verifyIfDuplicate(EventID eventId, boolean addToMap);
+ public boolean verifyIfDuplicate(EventID eventId);
+ /** test hook
+ */
+ public java.util.Map getThreadIdToSequenceIdMap();
+ public void start(ScheduledExecutorService timer, int interval);
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueStateImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueStateImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueStateImpl.java
new file mode 100755
index 0000000..3dda60b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueStateImpl.java
@@ -0,0 +1,440 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class QueueStateImpl implements QueueState {
+ private static final Logger logger = LogService.getLogger();
+
+ protected QueueManager qManager = null;
+ private boolean processedMarker = false;
+ private final AtomicInteger invalidateCount = new AtomicInteger();
+
+ /**
+ * This will store the ThreadId to latest received sequence Id
+ *
+ * Keys are instances of {@link ThreadIdentifier} Values are instances of
+ * {@link SequenceIdAndExpirationObject}
+ */
+ protected final Map threadIdToSequenceId = new LinkedHashMap();
+
+ public QueueStateImpl(QueueManager qm) {
+ this.qManager = qm;
+ }
+
+ public void processMarker() {
+ if (!this.processedMarker) {
+ handleMarker();
+ this.processedMarker = true;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("{}: extra marker received", this);
+ }
+ }
+ }
+
+ public boolean getProcessedMarker() {
+ return this.processedMarker;
+ }
+
+ public void handleMarker() {
+ ArrayList regions = new ArrayList();
+ Cache cache = GemFireCacheImpl.getInstance();
+ if (cache == null) {
+ return;
+ }
+
+ Set rootRegions = cache.rootRegions();
+
+
+ for (Iterator iter1 = rootRegions.iterator(); iter1.hasNext();) {
+ Region rootRegion = (Region) iter1.next();
+ regions.add(rootRegion);
+ try {
+ Set subRegions = rootRegion.subregions(true); // throws RDE
+ for (Iterator iter2 = subRegions.iterator(); iter2.hasNext();) {
+ regions.add(iter2.next());
+ }
+ } catch (RegionDestroyedException e) {
+ continue; // region is gone go to the next one bug 38705
+ }
+ }
+
+ for (Iterator iter = regions.iterator(); iter.hasNext();) {
+ LocalRegion region = (LocalRegion) iter.next();
+ try {
+ if (region.getAttributes().getPoolName()!=null && region.getAttributes().getPoolName().equals(qManager.getPool().getName())) {
+ region.handleMarker(); // can this throw RDE??
+ }
+ }
+ catch (RegionDestroyedException e) {
+ continue; // region is gone go to the next one bug 38705
+ }
+ }
+ }
+
+ public void incrementInvalidatedStats() {
+ this.invalidateCount.incrementAndGet();
+
+ }
+ public int getInvalidateCount() {
+ return this.invalidateCount.get();
+ }
+
+ /** test hook
+ */
+ public Map getThreadIdToSequenceIdMap() {
+ return this.threadIdToSequenceId;
+ }
+
+ public boolean verifyIfDuplicate(EventID eid) {
+ return verifyIfDuplicate(eid, true);
+ }
+
+ public boolean verifyIfDuplicate(EventID eid, boolean addToMap) {
+ ThreadIdentifier tid = new ThreadIdentifier(eid.getMembershipID(), eid
+ .getThreadID());
+ long seqId = eid.getSequenceID();
+ SequenceIdAndExpirationObject seo = null;
+
+ // Fix 36930: save the max sequence id for each non-putAll operation's thread
+ // There're totally 3 cases to consider:
+ // check the tid:
+ // 1) if duplicated, (both putall or non-putall): reject
+ // 2) if not duplicate
+ // 2.1)if putAll, check via real thread id again,
+ // if duplicate, reject (because one non-putall operation with bigger
+ // seqno has happened)
+ // otherwise save the putAllSeqno for real thread id
+ // and save seqno for tid
+ // 2.2) if not putAll,
+ // check putAllSequenceId with real thread id
+ // if request's seqno is smaller, reject (because one putAll operation
+ // with bigger seqno has happened)
+ // otherwise, update the seqno for tid
+ // lock taken to avoid concurrentModification
+ // while the objects are being expired
+ synchronized (this.threadIdToSequenceId) {
+ seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(tid);
+ if (seo != null && seo.getSequenceId() >= seqId) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(" got a duplicate entry with EventId {}. Ignoring the entry", eid);
+ }
+ seo.setAckSend(false); // bug #41289: send ack to this server since it's sending old events
+ // this.threadIdToSequenceId.put(tid, new SequenceIdAndExpirationObject(
+ // seo.getSequenceId()));
+ return true;
+ }
+ else if (addToMap) {
+ ThreadIdentifier real_tid = new ThreadIdentifier(eid.getMembershipID(),
+ ThreadIdentifier.getRealThreadIDIncludingWan(eid.getThreadID()));
+ if (ThreadIdentifier.isPutAllFakeThreadID(eid.getThreadID())) {
+ // it's a putAll
+ seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(real_tid);
+ if (seo != null && seo.getSequenceId() >= seqId) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("got a duplicate putAll entry with eventId {}. Other operation with same thread id and bigger seqno {} has happened. Ignoring the entry", eid, seo.getSequenceId());
+ }
+ seo.setAckSend(false); // bug #41289: send ack to servers that send old events
+ return true;
+ }
+ else {
+ // save the seqno for real thread id into a putAllSequenceId
+ this.threadIdToSequenceId.remove(real_tid);
+ this.threadIdToSequenceId.put(real_tid, seo == null?
+ new SequenceIdAndExpirationObject(-1, seqId):
+ new SequenceIdAndExpirationObject(seo.getSequenceId(), seqId));
+ // save seqno for tid
+ // here tid!=real_tid, for fake tid, putAllSeqno should be 0
+ this.threadIdToSequenceId.remove(tid);
+ this.threadIdToSequenceId.put(tid, new SequenceIdAndExpirationObject(seqId, -1));
+ }
+ } else {
+ // non-putAll operation:
+ // check putAllSeqno for real thread id
+ // if request's seqno is smaller, reject
+ // otherwise, update the seqno for tid
+ seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(real_tid);
+ if (seo != null && seo.getPutAllSequenceId() >= seqId) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("got a duplicate non-putAll entry with eventId {}. One putAll operation with same real thread id and bigger seqno {} has happened. Ignoring the entry", eid, seo.getPutAllSequenceId());
+ }
+ seo.setAckSend(false); // bug #41289: send ack to servers that send old events
+ return true;
+ }
+ else {
+ // here tid==real_tid
+ this.threadIdToSequenceId.remove(tid);
+ this.threadIdToSequenceId.put(tid, seo == null?
+ new SequenceIdAndExpirationObject(seqId, -1):
+ new SequenceIdAndExpirationObject(seqId, seo.getPutAllSequenceId()));
+ }
+ }
+ }
+ }
+ return false;
+ }
+ public void start(ScheduledExecutorService timer, int interval) {
+ timer.scheduleWithFixedDelay(new ThreadIdToSequenceIdExpiryTask(),
+ interval, interval, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ *
+ * Thread which will iterate over threadIdToSequenceId map
+ *
+ * 1)It will send an ack primary server for all threadIds for which it has not
+ * send an ack. 2)It will expire the entries which have exceeded the specified
+ * expiry time and for which ack has been alerady sent.
+ *
+ * @author darrel
+ * @author Mitul Bid
+ * @author Suyog Bhokare
+ * @since 5.1
+ *
+ */
+
+ private class ThreadIdToSequenceIdExpiryTask extends PoolTask {
+ /**
+ * The expiry time of the entries in the map
+ */
+ private final long expiryTime;
+
+ /**
+ * The peridic ack interval for client
+ */
+// private final long ackTime;
+// ackTime = QueueStateImpl.this.qManager.getPool().getQueueAckInterval();
+
+// /**
+// * boolean to specify if the thread should continue running
+// */
+// private volatile boolean continueRunning = true;
+
+ /**
+ * constructs the Thread and initializes the expiry time
+ *
+ */
+ public ThreadIdToSequenceIdExpiryTask() {
+ expiryTime = QueueStateImpl.this.qManager.getPool()
+ .getSubscriptionMessageTrackingTimeout();
+ }
+
+ @Override
+ public void run2() {
+ SystemFailure.checkFailure();
+ if (qManager.getPool().getCancelCriterion().cancelInProgress() != null) {
+ return;
+ }
+ if (PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.beforeSendingClientAck();
+ }
+ //if ((qManager.getPool().getSubscriptionRedundancy() != 0) || (qManager.getPool().isDurableClient())) {
+ sendPeriodicAck();
+ //}
+ checkForExpiry();
+ }
+
+// void shutdown() {
+// synchronized (this) {
+// continueRunning = false;
+// this.notify();
+// // Since the wait is timed, it is not necessary to interrupt
+// // the thread; it will wake up of its own accord.
+// // this.interrupt();
+// }
+// try {
+// this.join();
+// } catch (InterruptedException e) {
+// Thread.currentThread().interrupt();
+// // TODO:
+// }
+// }
+
+ void checkForExpiry() {
+ synchronized (threadIdToSequenceId) {
+ Iterator iterator = threadIdToSequenceId.entrySet().iterator();
+ long currentTime = System.currentTimeMillis();
+ Map.Entry entry;
+ SequenceIdAndExpirationObject seo;
+
+ while (iterator.hasNext()) {
+ entry = (Map.Entry) iterator.next();
+ seo = (SequenceIdAndExpirationObject) entry.getValue();
+ if ((currentTime - seo.getCreationTime() > this.expiryTime)) {
+ if (seo.getAckSend()
+ || (qManager.getPool().getSubscriptionRedundancy() == 0 && !qManager.getPool().isDurableClient())) {
+ iterator.remove();
+ }
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Sends Periodic ack to the primary server for all threadIds for which it
+ * has not send an ack.
+ */
+ void sendPeriodicAck() {
+ List events = new ArrayList();
+ boolean success = false;
+ synchronized (threadIdToSequenceId) {
+ Iterator iterator = threadIdToSequenceId.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry entry = (Map.Entry) iterator.next();
+ SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject) entry
+ .getValue();
+ if (!seo.getAckSend()) {
+ ThreadIdentifier tid = (ThreadIdentifier) entry.getKey();
+ events.add(new EventID(tid.getMembershipID(), tid.getThreadID(),
+ seo.getSequenceId()));
+ seo.setAckSend(true);
+ // entry.setValue(entry);
+ }// if ends
+ }// while ends
+ }// synchronized ends
+
+ if (events.size() > 0) {
+ try {
+ PrimaryAckOp.execute(qManager.getAllConnections().getPrimary(), qManager.getPool(), events);
+ success = true;
+ } catch (Exception ex) {
+ if (logger.isDebugEnabled())
+ logger.debug("Exception while sending an ack to the primary server: {}", ex);
+ } finally {
+ if (!success) {
+ Iterator iter = events.iterator();
+ while (iter.hasNext()) {
+ EventID eid = (EventID) iter.next();
+ ThreadIdentifier tid = new ThreadIdentifier(
+ eid.getMembershipID(), eid.getThreadID());
+ synchronized (threadIdToSequenceId) {
+ SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject) threadIdToSequenceId
+ .get(tid);
+ if (seo != null && seo.getAckSend()) {
+ seo = (SequenceIdAndExpirationObject) threadIdToSequenceId
+ .remove(tid);
+ if (seo != null) {
+ // put back the old seqId with a new time stamp
+ SequenceIdAndExpirationObject siaeo = new SequenceIdAndExpirationObject(
+ seo.getSequenceId(), seo.getPutAllSequenceId());
+ threadIdToSequenceId.put(tid, siaeo);
+ }
+ }// if ends
+ }// synchronized ends
+ }// while ends
+ }// if(!success) ends
+ }// finally ends
+ }// if(events.size() > 0)ends
+ }// method ends
+ }
+
+ /**
+ * A class to store sequenceId and the creation time of the object to be used
+ * for expiring the entry
+ *
+ * @author Mitul Bid
+ * @since 5.1
+ *
+ */
+ public static class SequenceIdAndExpirationObject {
+ /** The sequence Id of the entry * */
+ private final long sequenceId;
+ /** The sequence Id of the putAll operations * */
+ private final long putAllSequenceId;
+ /** The time of creation of the object* */
+ private final long creationTime;
+ /** Client ack is send to server or not* */
+ private boolean ackSend;
+
+ SequenceIdAndExpirationObject(long sequenceId, long putAllSequenceId) {
+ this.sequenceId = sequenceId;
+ this.putAllSequenceId = putAllSequenceId;
+ this.creationTime = System.currentTimeMillis();
+ this.ackSend = false;
+ }
+
+ /**
+ * @return Returns the creationTime.
+ */
+ public final long getCreationTime() {
+ return creationTime;
+ }
+
+ /**
+ * @return Returns the sequenceId.
+ */
+ public final long getSequenceId() {
+ return sequenceId;
+ }
+
+ /**
+ * @return Returns the putAllSequenceId.
+ */
+ public final long getPutAllSequenceId() {
+ return putAllSequenceId;
+ }
+
+ /**
+ *
+ * @return Returns the ackSend
+ */
+ public boolean getAckSend() {
+ return ackSend;
+ }
+
+ /**
+ * Sets the ackSend
+ *
+ * @param ackSend
+ */
+ public void setAckSend(boolean ackSend) {
+ this.ackSend = ackSend;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ sb.append("SequenceIdAndExpirationObject[");
+ sb.append("ackSend = " + this.ackSend);
+ sb.append("; creation = " + creationTime);
+ sb.append("; seq = " + sequenceId);
+ sb.append("; putAll seq = " + putAllSequenceId);
+ sb.append("]");
+ return sb.toString();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
new file mode 100644
index 0000000..8d598d1
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
@@ -0,0 +1,82 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tells the server we are ready to receive server-to-client events
+ * from durable subscriptions.
+ * @author darrel
+ * @since 5.7
+ */
+public class ReadyForEventsOp {
+ /**
+ * Tells the primary server we are ready to receive server-to-client events
+ * from durable subscriptions.
+ * @param pool the pool to use to communicate with the server.
+ * @param primary
+ */
+ public static void execute(ExecutablePool pool, QueueConnectionImpl primary)
+ {
+ AbstractOp op = new ReadyForEventsOpImpl();
+ pool.executeOn(primary, op);
+ }
+
+ private ReadyForEventsOp() {
+ // no instances allowed
+ }
+
+ private static class ReadyForEventsOpImpl extends AbstractOp {
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public ReadyForEventsOpImpl() {
+ super(MessageType.CLIENT_READY, 1);
+ }
+
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ getMessage().send(false);
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "readyForEvents");
+ return null;
+ }
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startReadyForEvents();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endReadyForEventsSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endReadyForEvents(start, hasTimedOut(), hasFailed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
new file mode 100644
index 0000000..c3ab84b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
@@ -0,0 +1,129 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributesHolder;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+public class RegisterDataSerializersOp {
+
+ public static void execute(ExecutablePool pool,
+ DataSerializer[] dataSerializers, EventID eventId) {
+ AbstractOp op = new RegisterDataSerializersOpImpl(dataSerializers,
+ eventId);
+ pool.execute(op);
+ }
+
+ public static void execute(ExecutablePool pool,
+ SerializerAttributesHolder[] holders, EventID eventId) {
+ AbstractOp op = new RegisterDataSerializersOpImpl(holders,
+ eventId);
+ pool.execute(op);
+ }
+
+ private RegisterDataSerializersOp() {
+ // no instances allowed
+ }
+
+ private static class RegisterDataSerializersOpImpl extends AbstractOp {
+
+ /**
+ * @throws com.gemstone.gemfire.SerializationException if serialization fails
+ */
+ public RegisterDataSerializersOpImpl(DataSerializer[] dataSerializers,
+ EventID eventId) {
+ super(MessageType.REGISTER_DATASERIALIZERS, dataSerializers.length * 2 + 1);
+ for(int i = 0; i < dataSerializers.length; i++) {
+ DataSerializer dataSerializer = dataSerializers[i];
+ // strip '.class' off these class names
+ String className = dataSerializer.getClass().toString().substring(6);
+ try {
+ getMessage().addBytesPart(BlobHelper.serializeToBlob(className));
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ getMessage().addIntPart(dataSerializer.getId());
+ }
+ getMessage().addBytesPart(eventId.calcBytes());
+ // // CALLBACK FOR TESTING PURPOSE ONLY ////
+ if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.beforeSendingToServer(eventId);
+ }
+ }
+
+ /**
+ * @throws SerializationException
+ * Thrown when serialization fails.
+ */
+ public RegisterDataSerializersOpImpl(SerializerAttributesHolder[] holders,
+ EventID eventId) {
+ super(MessageType.REGISTER_DATASERIALIZERS, holders.length * 2 + 1);
+ for (int i = 0; i < holders.length; i++) {
+ try {
+ getMessage().addBytesPart(
+ BlobHelper.serializeToBlob(holders[i].getClassName()));
+ } catch (IOException ex) {
+ throw new SerializationException("failed serializing object", ex);
+ }
+ getMessage().addIntPart(holders[i].getId());
+ }
+ getMessage().addBytesPart(eventId.calcBytes());
+ // // CALLBACK FOR TESTING PURPOSE ONLY ////
+ if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+ BridgeObserver bo = BridgeObserverHolder.getInstance();
+ bo.beforeSendingToServer(eventId);
+ }
+ }
+
+ @Override
+ protected Object processResponse(Message msg) throws Exception {
+ processAck(msg, "registerDataSerializers");
+ return null;
+ }
+
+ @Override
+ protected boolean isErrorResponse(int msgType) {
+ return false;
+ }
+ @Override
+ protected long startAttempt(ConnectionStats stats) {
+ return stats.startRegisterDataSerializers();
+ }
+ @Override
+ protected void endSendAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterDataSerializersSend(start, hasFailed());
+ }
+ @Override
+ protected void endAttempt(ConnectionStats stats, long start) {
+ stats.endRegisterDataSerializers(start, hasTimedOut(), hasFailed());
+ }
+ @Override
+ protected void processSecureBytes(Connection cnx, Message message)
+ throws Exception {
+ }
+ @Override
+ protected boolean needsUserId() {
+ return false;
+ }
+ @Override
+ protected void sendMessage(Connection cnx) throws Exception {
+ getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+ getMessage().send(false);
+ }
+ }
+}