You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by rv...@apache.org on 2015/04/28 23:40:11 UTC

[06/51] [partial] incubator-geode git commit: Init

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManagerImpl.java
new file mode 100644
index 0000000..71357e3
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueManagerImpl.java
@@ -0,0 +1,1478 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.GemFireException;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.NoSubscriptionServersAvailableException;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker.RegionInterestEntry;
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.BlackListListener;
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.BlackListListenerAdapter;
+import com.gemstone.gemfire.cache.client.internal.ServerBlackList.FailureTracker;
+import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.ClientCQ;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientUpdater;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ServerQueueStatus;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.security.GemFireSecurityException;
+import com.gemstone.org.jgroups.util.StringId;
+
+/**
+ * Manages Client Queues. Responsible for creating callback connections and
+ * satisfying redundancy requirements.
+ * 
+ * @author dsmith
+ * @since 5.7
+ * 
+ */
+public class QueueManagerImpl implements QueueManager {
+  private static final Logger logger = LogService.getLogger();
+
+//  private static final long SERVER_LOCATION_TIMEOUT = Long.getLong(
+//      "gemfire.QueueManagerImpl.SERVER_LOCATION_TIMEOUT", 120000).longValue();
+  private static final Comparator QSIZE_COMPARATOR = new QSizeComparator();
+
+  protected final long redundancyRetryInterval;
+  private final EndpointManager endpointManager;
+  private final EndpointManager.EndpointListenerAdapter endpointListener;
+  private final ConnectionSource source;
+  private final int redundancyLevel;
+  protected final ConnectionFactory factory;
+  private final InternalLogWriter securityLogger;
+  private final ClientProxyMembershipID proxyId;
+  protected final InternalPool pool;
+  private final QueueStateImpl state;
+  private boolean printPrimaryNotFoundError;
+  private boolean printRedundancyNotSatisfiedError;
+  private boolean printRecoveringPrimary;
+  private boolean printRecoveringRedundant;
+  protected final ServerBlackList blackList;
+  // Lock which guards updates to queueConnections.
+  // Also threads calling getAllConnections will wait on this
+  // lock until there is a primary.
+  protected final Object lock = new Object();
+  
+  protected final CountDownLatch initializedLatch = new CountDownLatch(1);
+
+  private ScheduledThreadPoolExecutor recoveryThread;
+  private volatile boolean sentClientReady;
+
+  // queueConnections in maintained by using copy-on-write
+  protected volatile ConnectionList queueConnections = new ConnectionList();
+  protected volatile RedundancySatisfierTask redundancySatisfierTask = null;
+  private volatile boolean shuttingDown;
+
+  public QueueManagerImpl(
+      InternalPool pool, 
+      EndpointManager endpointManager,
+      ConnectionSource source, 
+      ConnectionFactory factory,
+      int queueRedundancyLevel, 
+      long redundancyRetryInterval, 
+      InternalLogWriter securityLogger,
+      ClientProxyMembershipID proxyId) {
+    this.printPrimaryNotFoundError = true;
+    this.printRedundancyNotSatisfiedError = true;
+    this.printRecoveringRedundant = true;
+    this.printRecoveringPrimary = true;
+    this.pool = pool;
+    this.endpointManager = endpointManager;
+    this.source = source;
+    this.factory = factory;
+    this.redundancyLevel = queueRedundancyLevel;
+    this.securityLogger = securityLogger;
+    this.proxyId = proxyId;
+    this.redundancyRetryInterval = redundancyRetryInterval;
+    blackList = new ServerBlackList(redundancyRetryInterval);
+    
+    
+    this.endpointListener = new EndpointManager.EndpointListenerAdapter() {
+      @Override
+      public void endpointCrashed(Endpoint endpoint) {
+        QueueManagerImpl.this.endpointCrashed(endpoint);
+      }
+    };
+    
+    this.state = new QueueStateImpl(this);
+  }
+
+  public InternalPool getPool() {
+    return pool;
+  }
+
+  public boolean isPrimaryUpdaterAlive() {
+    boolean result = false;
+    QueueConnectionImpl primary = (QueueConnectionImpl)
+      queueConnections.getPrimary();
+    if (primary != null) {
+      ClientUpdater cu = primary.getUpdater();
+      if (cu != null) {
+        result = ((CacheClientUpdater)cu).isAlive();
+      }
+    }
+    return result;
+  }
+  
+  public QueueConnections getAllConnectionsNoWait() {
+      return queueConnections;
+  }
+  
+  public QueueConnections getAllConnections() {
+
+    ConnectionList snapshot = queueConnections;
+    if (snapshot.getPrimary() == null) {
+      // wait for a new primary to become available.
+      synchronized (lock) {
+        snapshot = queueConnections;
+        while (snapshot.getPrimary() == null
+            && !snapshot.primaryDiscoveryFailed() && !shuttingDown && pool.getPoolOrCacheCancelInProgress()==null) {
+          try {
+            lock.wait();
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            break;
+          }
+          snapshot = queueConnections;
+        }
+      }
+    }
+    
+    if (snapshot.getPrimary() == null) {
+      pool.getCancelCriterion().checkCancelInProgress(null);
+      GemFireException exception  = snapshot.getPrimaryDiscoveryException();
+      if(exception == null || exception instanceof NoSubscriptionServersAvailableException) {
+        exception = new NoSubscriptionServersAvailableException(exception);
+      }
+      else { 
+        exception = new ServerConnectivityException(exception.getMessage(), exception);
+      }
+      throw exception;
+    }
+
+    return snapshot;
+  }
+
+  public InternalLogWriter getSecurityLogger() {
+    return securityLogger;
+  }
+
+  public void close(boolean keepAlive) {
+    endpointManager.removeListener(endpointListener);
+    synchronized (lock) {
+      shuttingDown = true;
+      if (redundancySatisfierTask != null) {
+        redundancySatisfierTask.cancel();
+      }
+      lock.notifyAll();
+    }
+    if (recoveryThread != null) {
+      // it will be null if we never called start
+      recoveryThread.shutdown();
+    }
+    if (recoveryThread != null) {
+      try {
+        if(!recoveryThread.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS)) {
+          logger.warn(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_TIMEOUT_WAITING_FOR_RECOVERY_THREAD_TO_COMPLETE));
+        }
+      } catch (InterruptedException e1) {
+        Thread.currentThread().interrupt();
+        logger.debug("Interrupted waiting for recovery thread termination");
+      }
+    }
+    
+    QueueConnectionImpl primary = (QueueConnectionImpl) queueConnections
+        .getPrimary();
+    if(logger.isDebugEnabled()) {
+      logger.debug("QueueManagerImpl - closing connections with keepAlive={}", keepAlive);
+    }
+    if (primary != null) {
+      try {
+        if(logger.isDebugEnabled()) {
+          logger.debug("QueueManagerImpl - closing primary {}", primary);
+        }
+        primary.internalClose(keepAlive);
+      } catch (Exception e) {
+        logger.warn(LocalizedMessage.create(
+          LocalizedStrings.QueueManagerImpl_ERROR_CLOSING_PRIMARY_CONNECTION_TO_0,
+          primary.getEndpoint()), e);
+      }
+    }
+
+    List backups = queueConnections.getBackups();
+    for (Iterator itr = backups.iterator(); itr.hasNext();) {
+      QueueConnectionImpl backup = (QueueConnectionImpl) itr.next();
+      if (backup != null) {
+        try {
+          if(logger.isDebugEnabled()) {
+            logger.debug("QueueManagerImpl - closing backup {}", backup);
+          }
+          backup.internalClose(keepAlive);
+        } catch (Exception e) {
+          logger.warn(LocalizedMessage.create(
+              LocalizedStrings.QueueManagerImpl_ERROR_CLOSING_BACKUP_CONNECTION_TO_0,
+              backup.getEndpoint()), e);
+        }
+      }
+    }
+  }
+  
+  
+  public void emergencyClose() {
+    shuttingDown = true;
+    queueConnections.getPrimary().emergencyClose();
+    List backups = queueConnections.getBackups();
+    for(int i = 0; i < backups.size(); i++) {
+      Connection backup = (Connection) backups.get(i);
+      backup.emergencyClose();
+    }
+  }
+
+  public void start(ScheduledExecutorService background) {
+    try {
+      blackList.start(background);
+      endpointManager.addListener(endpointListener);
+
+      // Use a separate timer for queue management tasks
+      // We don't want primary recovery (and therefore user threads) to wait for
+      // things like pinging connections for health checks.
+      // this.background = background;
+      final String name = "queueTimer-" + this.pool.getName();
+      this.recoveryThread = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
+
+        public Thread newThread(Runnable r) {
+          Thread result = new Thread(r, name);
+          result.setDaemon(true);
+          return result;
+        }
+
+
+      });
+      recoveryThread.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+
+//    TODO - use yet another Timer or the like for these tasks? We know
+      //we don't want them in the recoveryThread, because the ThreadIdToSequenceIdExpiryTask
+      //will wait for primary recovery.
+      getState().start(background, getPool().getSubscriptionAckInterval());
+
+      // initialize connections
+      initializeConnections();
+
+      scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
+
+      //When a server is removed from the blacklist, try again
+      //to establish redundancy (if we need to)
+      BlackListListener blackListListener = new BlackListListenerAdapter() {
+        @Override
+        public void serverRemoved(ServerLocation location) {
+          QueueManagerImpl.this.scheduleRedundancySatisfierIfNeeded(0);
+        }
+      };
+
+      blackList.addListener(blackListListener);
+      factory.getBlackList().addListener(blackListListener);
+    } finally {
+      initializedLatch.countDown();
+    }
+  }
+  
+  
+
+  public void readyForEvents(InternalDistributedSystem system) {
+    synchronized(lock) {
+      this.sentClientReady = true;
+    }
+
+    QueueConnectionImpl primary = null;
+    while (primary == null) {
+      try {
+        primary = (QueueConnectionImpl) getAllConnections().getPrimary();
+      } catch(NoSubscriptionServersAvailableException e) {
+        primary = null;
+        break;
+      }
+      if(primary.sendClientReady()) {
+        try {
+          logger.info(LocalizedMessage.create(
+            LocalizedStrings.QueueManagerImpl_SENDING_READY_FOR_EVENTS_TO_PRIMARY_0,
+            primary));
+          ReadyForEventsOp.execute(pool, primary); 
+        } catch(Exception e) {
+          if(logger.isDebugEnabled()) {
+            logger.debug("Error sending ready for events to {}", primary, e);
+          }
+          primary.destroy();
+          primary = null;
+        }
+      }
+    }
+  }
+  
+  public void readyForEventsAfterFailover(QueueConnectionImpl primary) {
+    try {
+      logger.info(LocalizedMessage.create(
+        LocalizedStrings.QueueManagerImpl_SENDING_READY_FOR_EVENTS_TO_PRIMARY_0, primary));
+      ReadyForEventsOp.execute(pool, primary);
+    } catch(Exception e) {
+      if(logger.isDebugEnabled()) {
+        logger.debug("Error sending ready for events to {}", primary, e);
+      }
+      primary.destroy();
+    }
+  }
+
+  void connectionCrashed(Connection con) {
+    // the endpoint has not crashed but this method does all the work
+    // we need to do
+    endpointCrashed(con.getEndpoint());
+  }
+  
+  void endpointCrashed(Endpoint endpoint) {
+    QueueConnectionImpl deadConnection = null;
+    //We must be synchronized while checking to see if we have a queue connection for the endpoint,
+    //because when we need to prevent a race between adding a queue connection to the map
+    //and the endpoint for that connection crashing.
+    synchronized (lock) {
+      deadConnection = queueConnections.getConnection(endpoint);
+      if (deadConnection != null) {
+        queueConnections = queueConnections.removeConnection(deadConnection);
+      }
+    }
+    if (deadConnection != null) {
+      logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_ENDPOINT_CRASHED_SCHEDULING_RECOVERY, 
+                        new Object[]{deadConnection.getUpdater() != null ?(deadConnection.getUpdater().isPrimary()? "Primary" : "Redundant") : "Queue", endpoint}));
+      scheduleRedundancySatisfierIfNeeded(0);
+      deadConnection.internalDestroy();
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Ignoring crashed endpoint {} it does not have a queue.", endpoint);
+      }
+    }
+  }
+  /**
+   * This method checks whether queue connection exist on this endpoint or not.
+   *  if its there then it just destroys connection as clientUpdate thread is not there to read that connection.
+   */
+  public void checkEndpoint(ClientUpdater ccu, Endpoint endpoint)
+  {
+    QueueConnectionImpl deadConnection = null;
+    
+    synchronized (lock) {
+      if(shuttingDown)
+        return;
+      //if same client updater then only remove as we don't know whether it has created new updater/connection on same endpoint or not..
+      deadConnection = queueConnections.getConnection(endpoint);
+      if (deadConnection != null && ccu.equals(deadConnection.getUpdater())) {
+        queueConnections = queueConnections.removeConnection(deadConnection);        
+        deadConnection.internalDestroy();
+      }      
+    }    
+    
+    logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_CACHE_CLIENT_UPDATER_FOR_ON_ENDPOINT_EXITING_SCHEDULING_RECOVERY,
+                new Object[]{(deadConnection != null && deadConnection.getUpdater() != null)?(deadConnection.getUpdater().isPrimary()? "Primary" : "Redundant"): "Queue", endpoint}));
+    scheduleRedundancySatisfierIfNeeded(0);//one more chance
+  }
+
+  private void initializeConnections() {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (isDebugEnabled) {
+      logger.debug("SubscriptionManager - intitializing connections");
+    }
+    
+    int queuesNeeded = redundancyLevel == -1 ? -1 : redundancyLevel + 1;
+    Set excludedServers = new HashSet(blackList.getBadServers());
+    List servers = findQueueServers(excludedServers, queuesNeeded, true, false, null);
+
+    if (servers == null || servers.isEmpty()) {
+      logger.warn(
+        LocalizedStrings.QueueManagerImpl_COULD_NOT_CREATE_A_QUEUE_NO_QUEUE_SERVERS_AVAILABLE);
+      scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
+      synchronized (lock) {
+        queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
+        lock.notifyAll();
+      }
+      return;
+    }
+
+    if (isDebugEnabled) {
+      logger.debug("SubscriptionManager - discovered subscription servers {}", servers);
+    }
+
+    SortedMap/* <ServerQueueStatus,Connection> */oldQueueServers = new TreeMap(
+        QSIZE_COMPARATOR);
+    List nonRedundantServers = new ArrayList();
+
+    for (Iterator itr = servers.iterator(); itr.hasNext();) {
+      ServerLocation server = (ServerLocation) itr.next();
+      Connection connection = null;
+      try {
+        connection = factory.createClientToServerConnection(server, true);
+      } catch(GemFireSecurityException e) {
+        throw e;
+      } catch (Exception e) {
+        if (isDebugEnabled) {
+          logger.debug("SubscriptionManager - Error connected to server: {}", server, e);
+        }
+      }
+      if (connection != null) {
+        ServerQueueStatus status = connection.getQueueStatus();
+        if (status.isRedundant() || status.isPrimary()) {
+          oldQueueServers.put(status, connection);
+        } else {
+          nonRedundantServers.add(connection);
+        }
+      }
+    }
+
+    // This ordering was determined from the old ConnectionProxyImpl code
+    //
+    // initialization order of the new redundant and primary server is
+    // old redundant w/ second largest queue
+    // old redundant w/ third largest queue
+    // ...
+    // old primary
+    // non redundants in no particular order
+    //
+    // The primary is then chosen as
+    // redundant with the largest queue
+    // primary if there are no redundants
+    // a non redundant
+
+    // if the redundant with the largest queue fails, then we go and
+    // make a new server a primary.
+
+    Connection newPrimary = null;
+    if (!oldQueueServers.isEmpty()) {
+      newPrimary = (Connection) oldQueueServers.remove(oldQueueServers
+          .lastKey());
+    } else if (!nonRedundantServers.isEmpty()) {
+      newPrimary = (Connection) nonRedundantServers.remove(0);
+    }
+    
+    nonRedundantServers.addAll(0, oldQueueServers.values());
+
+    for (Iterator itr = nonRedundantServers.iterator(); itr.hasNext();) {
+      Connection connection = (Connection) itr.next();
+      QueueConnectionImpl queueConnection = initializeQueueConnection(
+          connection, false, null);
+      if (queueConnection != null) {
+        addToConnectionList(queueConnection, false);
+      }
+    }
+
+    QueueConnectionImpl primaryQueue = null;
+    if (newPrimary != null) {
+      primaryQueue = initializeQueueConnection(newPrimary, true, null);
+      if (primaryQueue == null) {
+        newPrimary.destroy();
+      } else {
+        if(!addToConnectionList(primaryQueue, true)) {
+          primaryQueue = null;
+        }
+      }
+    }
+
+
+    excludedServers.addAll(servers);
+
+    // Make sure we have enough redundant copies. Some of the connections may
+    // have failed
+    // above.
+    if (redundancyLevel != -1 && getCurrentRedundancy() < redundancyLevel) {
+      if (isDebugEnabled) {
+        logger.debug("SubscriptionManager - Some initial connections failed. Trying to create redundant queues");
+      }
+      recoverRedundancy(excludedServers, false);
+    }
+
+    if (redundancyLevel != -1 && primaryQueue == null) {
+      if (isDebugEnabled) {
+        logger.debug("SubscriptionManager - Intial primary creation failed. Trying to create a new primary");
+      }
+      while(primaryQueue == null) { 
+        primaryQueue = createNewPrimary(excludedServers);
+        if(primaryQueue == null) {
+          //couldn't find a server to make primary
+          break;
+        }
+        if(!addToConnectionList(primaryQueue, true)) {
+          excludedServers.add(primaryQueue.getServer());
+          primaryQueue = null;
+        }
+      }
+    }
+
+    if (primaryQueue == null) {
+      if (isDebugEnabled) {
+        logger.debug("SubscriptionManager - Unable to create a new primary queue, using one of the redundant queues");
+      }
+      while(primaryQueue == null) {
+        primaryQueue = promoteBackupToPrimary(queueConnections.getBackups());
+        if(primaryQueue == null) {
+          //no backup servers available
+          break;
+        }
+        if(!addToConnectionList(primaryQueue, true)) {
+          synchronized(lock) {
+            //make sure we don't retry this same connection.
+            queueConnections = queueConnections.removeConnection(primaryQueue);
+          }
+          primaryQueue = null;
+        }
+      }
+    }
+
+    if (primaryQueue == null) {
+      logger.error(LocalizedMessage.create(
+          LocalizedStrings.QueueManagerImpl_COULD_NOT_INITIALIZE_A_PRIMARY_QUEUE_ON_STARTUP_NO_QUEUE_SERVERS_AVAILABLE));
+      synchronized (lock) {
+        queueConnections = queueConnections.setPrimaryDiscoveryFailed(
+            new NoSubscriptionServersAvailableException(LocalizedStrings.QueueManagerImpl_COULD_NOT_INITIALIZE_A_PRIMARY_QUEUE_ON_STARTUP_NO_QUEUE_SERVERS_AVAILABLE.toLocalizedString()));
+        lock.notifyAll();
+      }
+      cqsDisconnected();
+    }
+    else {
+      cqsConnected();
+    }
+
+    if (getCurrentRedundancy() < redundancyLevel) {
+      logger.warn(LocalizedMessage.create(
+          LocalizedStrings.QueueManagerImpl_UNABLE_TO_INITIALIZE_ENOUGH_REDUNDANT_QUEUES_ON_STARTUP_THE_REDUNDANCY_COUNT_IS_CURRENTLY_0,  
+          getCurrentRedundancy()));
+    }
+  }
+
+  private void cqsConnected() {
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    if(cache != null) {
+      CqService cqService = cache.getCqService();
+    //Primary queue was found, alert the affected cqs if necessary
+      cqService.cqsConnected(pool);
+    }
+  }
+
+  private void cqsDisconnected() {
+    //No primary queue was found, alert the affected cqs if necessary
+    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+    if(cache != null) {
+      CqService cqService = cache.getCqService();
+      cqService.cqsDisconnected(pool);
+    }
+  }
+
+  private int getCurrentRedundancy() {
+    return queueConnections.getBackups().size();
+  }
+  
+  /**
+   * Make sure that we have enough backup servers.
+   * 
+   * Add any servers we fail to connect to to the excluded servers list.
+   */
+  protected boolean recoverRedundancy(Set excludedServers, boolean recoverInterest) {
+    if(pool.getPoolOrCacheCancelInProgress() != null) {
+      return true;
+    }
+    int additionalBackups;
+    while (pool.getPoolOrCacheCancelInProgress()==null && ((additionalBackups = redundancyLevel - getCurrentRedundancy()) > 0
+        || redundancyLevel == -1))  {
+
+      
+      if(redundancyLevel != -1 && printRecoveringRedundant) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_MANAGER_REDUNDANCY_SATISFIER_REDUNDANT_ENDPOINT_HAS_BEEN_LOST_ATTEMPTIMG_TO_RECOVER));
+        printRecoveringRedundant = false;
+       }
+      
+      List servers = findQueueServers(excludedServers, redundancyLevel == -1 ? -1 : additionalBackups, false, 
+                            (redundancyLevel == -1 ? false : printRedundancyNotSatisfiedError),
+                                    LocalizedStrings.QueueManagerImpl_COULD_NOT_FIND_SERVER_TO_CREATE_REDUNDANT_CLIENT_QUEUE);
+
+      if (servers == null || servers.isEmpty()) {
+        if (redundancyLevel != -1) {
+          
+          if(printRedundancyNotSatisfiedError) {
+          logger.info(LocalizedMessage.create(
+            LocalizedStrings.QueueManagerImpl_REDUNDANCY_LEVEL_0_IS_NOT_SATISFIED_BUT_THERE_ARE_NO_MORE_SERVERS_AVAILABLE_REDUNDANCY_IS_CURRENTLY_1,
+            new Object[] { Integer.valueOf(redundancyLevel), Integer.valueOf(getCurrentRedundancy())}));
+          }          
+        }
+        printRedundancyNotSatisfiedError = false;//printed above
+        return false;
+      }
+      excludedServers.addAll(servers);
+
+      final boolean isDebugEnabled = logger.isDebugEnabled();
+      for (Iterator itr = servers.iterator(); itr.hasNext();) {
+        ServerLocation server = (ServerLocation) itr.next();
+        Connection connection = null;
+        try {
+          connection = factory.createClientToServerConnection(server, true);
+        } catch(GemFireSecurityException e) {
+          throw e;
+        } catch (Exception e) {
+          if (isDebugEnabled) {
+            logger.debug("SubscriptionManager - Error connecting to server: ()", server, e);
+          }
+        }
+        if (connection == null) {
+          continue;
+        }
+
+        QueueConnectionImpl queueConnection = initializeQueueConnection(
+            connection, false, null);
+        if (queueConnection != null) {
+          boolean isFirstNewConnection = false;
+          synchronized (lock) {
+            if (recoverInterest && queueConnections.getPrimary() == null
+                && queueConnections.getBackups().isEmpty()) {
+              //  we lost our queue at some point. We Need to recover
+              // interest. This server will be made primary after this method
+              // finishes
+              // because whoever killed the primary when this method started
+              // should
+              // have scheduled a task to recover the primary.
+              isFirstNewConnection = true;
+              // TODO - Actually, we need a better check than the above. There's
+              // still a chance
+              // that we haven't realized that the primary has died but it is
+              // already gone. We should
+              // get some information from the queue server about whether it was
+              // able to copy the
+              // queue from another server and decide if we need to recover our
+              // interest based on
+              // that information.
+            }
+          }
+          boolean promotionFailed = false;
+          if (isFirstNewConnection) {
+            if (!promoteBackupCnxToPrimary(queueConnection)) {
+              promotionFailed = true;
+            }
+          }
+          if (!promotionFailed) {
+            if (addToConnectionList(queueConnection, isFirstNewConnection)) {
+              //redundancy satisfied
+              printRedundancyNotSatisfiedError = true;
+              printRecoveringRedundant = true;
+              if (logger.isDebugEnabled()) {
+                logger.debug("SubscriptionManager redundancy satisfier - created a queue on server {}", queueConnection.getEndpoint());
+              }
+              // Even though the new redundant queue will usually recover
+              // subscription information (see bug #39014) from its initial
+              // image provider, in bug #42280 we found that this is not always
+              // the case, so clients must always register interest with the new
+              // redundant server.
+              if(recoverInterest) {
+                recoverInterest(queueConnection, isFirstNewConnection);
+              }
+            }
+          } 
+        }
+      } 
+    }    
+    return true;
+  }
+
+  private QueueConnectionImpl promoteBackupToPrimary(List backups) {
+    QueueConnectionImpl primary = null;
+    for (int i = 0; primary == null && i < backups.size(); i++) {
+      QueueConnectionImpl lastConnection = (QueueConnectionImpl) backups.get(i);
+      if (promoteBackupCnxToPrimary(lastConnection)) {
+        primary = lastConnection;
+      }
+    }
+    return primary;
+  }
+
+  private boolean promoteBackupCnxToPrimary(QueueConnectionImpl cnx) {
+    boolean result = false;
+    if (PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG) {
+      BridgeObserver bo = BridgeObserverHolder.getInstance();
+      bo.beforePrimaryIdentificationFromBackup();
+    }
+    try {
+      boolean haveSentClientReady = this.sentClientReady;
+      if(haveSentClientReady) {
+        cnx.sendClientReady();
+      }
+      ClientUpdater updater = cnx.getUpdater();
+      if(updater == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("backup connection was destroyed before it could become the primary.");
+        }
+        Assert.assertTrue(cnx.isDestroyed());
+      } else {
+        updater.setFailedUpdater(queueConnections.getFailedUpdater());
+        MakePrimaryOp.execute(pool, cnx, haveSentClientReady);
+        result = true;
+        if (PoolImpl.AFTER_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG) {
+          BridgeObserver bo = BridgeObserverHolder.getInstance();
+          bo.afterPrimaryIdentificationFromBackup(cnx.getServer());
+        }
+      }
+    } catch (Exception e) {
+      if(pool.getPoolOrCacheCancelInProgress() == null && logger.isDebugEnabled()) {
+        logger.debug("Error making a backup server the primary server for client subscriptions", e);
+      }
+    }
+    return result;
+  }
+  /**
+   * Create a new primary server from a non-redundant server.
+   * 
+   * Add any failed servers to the excludedServers set.
+   */
+  private QueueConnectionImpl createNewPrimary(Set excludedServers) {
+    QueueConnectionImpl primary = null;
+    while (primary == null && pool.getPoolOrCacheCancelInProgress()==null) {
+      List servers = findQueueServers(excludedServers, 1, false, 
+                                       printPrimaryNotFoundError,
+                                       LocalizedStrings.QueueManagerImpl_COULD_NOT_FIND_SERVER_TO_CREATE_PRIMARY_CLIENT_QUEUE);
+      printPrimaryNotFoundError = false; //printed above
+      if (servers == null || servers.isEmpty()) {
+        break;
+      }
+
+      Connection connection = null;
+      try {
+        connection = factory
+          .createClientToServerConnection((ServerLocation) servers.get(0), true);
+      } catch (GemFireSecurityException e) {
+        throw e;
+      } catch(Exception e) {
+        if(logger.isDebugEnabled()) {
+          logger.debug("SubscriptionManagerImpl - error creating a connection to server {}", servers.get(0));
+        }
+      }
+      if (connection != null) {
+        primary = initializeQueueConnection(connection, true, queueConnections.getFailedUpdater());
+      }
+      excludedServers.addAll(servers);
+    }
+    
+    if(primary != null && sentClientReady && primary.sendClientReady()) {
+      readyForEventsAfterFailover(primary);
+    }
+    return primary;
+  }
+
+  private List findQueueServers(Set excludedServers, int count,
+      boolean findDurable, boolean printErrorMessage, StringId msgId) {
+    List servers = null;
+    Exception ex = null;
+    try {
+      if(pool.getPoolOrCacheCancelInProgress()!=null) {
+        return null;
+      }
+      servers = source.findServersForQueue(excludedServers, count, proxyId, findDurable);
+    } catch(GemFireSecurityException e) {
+      //propagate the security exception immediately.
+      throw e;
+    } catch (Exception e) {
+      /*logger
+          .warning(
+              LocalizedStrings.QueueManagerImpl_COULD_NOT_RETRIEVE_LIST_OF_SERVERS_FOR_SUBSCRIPTION_0,
+              new Object[] { e.getMessage() });*/
+      ex = e;
+      if (logger.isDebugEnabled()) {
+        logger.debug("SubscriptionManager - Error getting the list of servers: {}", e);
+      }
+    }
+    
+    if(printErrorMessage)
+    {
+      if(servers == null || servers.isEmpty())
+      {
+        logger.error(LocalizedMessage.create(msgId, 
+          new Object[]{ (excludedServers!= null?excludedServers.size(): 0), (ex != null?ex.getMessage(): "no exception")}));
+      }
+    }
+    return servers;
+  }
+
+  /**
+   * Find a new primary, adding any failed servers we encounter to the excluded
+   * servers list
+   * 
+   * First we try to make a backup server the primary, but if run out of backup
+   * servers we will try to find a new server.
+   */
+  protected void recoverPrimary(Set excludedServers) {
+    if(pool.getPoolOrCacheCancelInProgress() != null) {
+      return;
+    }
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (queueConnections.getPrimary() != null) {
+      if (isDebugEnabled) {
+        logger.debug("Primary recovery not needed");
+      }
+      return;
+    }
+
+    if (isDebugEnabled) {
+      logger.debug("SubscriptionManager redundancy satisfier - primary endpoint has been lost. Attempting to recover");
+    }
+
+    if(printRecoveringPrimary) {
+     logger.info(LocalizedMessage.create(LocalizedStrings.QueueManagerImpl_SUBSCRIPTION_MANAGER_REDUNDANCY_SATISFIER_PRIMARY_ENDPOINT_HAS_BEEN_LOST_ATTEMPTIMG_TO_RECOVER));
+     printRecoveringPrimary = false;
+    }
+    
+    QueueConnectionImpl newPrimary = null;
+    while(newPrimary == null && pool.getPoolOrCacheCancelInProgress()==null) {
+      List backups = queueConnections.getBackups();
+      newPrimary = promoteBackupToPrimary(backups);
+      //Hitesh now lets say that server crashed
+      if(newPrimary == null) {
+        //could not find a backup to promote
+        break;
+      }
+      if(!addToConnectionList(newPrimary, true)) {
+        synchronized(lock) {
+          //make sure we don't retry the same backup server
+          queueConnections = queueConnections.removeConnection(newPrimary);
+        }
+        newPrimary = null;
+      }
+
+    }
+    
+    if(newPrimary != null) {
+      if (isDebugEnabled) {
+        logger.debug("SubscriptionManager redundancy satisfier - Switched backup server to primary: {}", newPrimary.getEndpoint());
+      }
+      if (PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG) {
+        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        bo.afterPrimaryRecovered(newPrimary.getServer());
+      }
+     
+      //new primary from back up server was found, alert affected cqs if necessary
+      cqsConnected();
+      printPrimaryNotFoundError = true;
+      printRecoveringPrimary =true;
+      return;
+    }
+
+    while(newPrimary == null) {
+      newPrimary = createNewPrimary(excludedServers);
+      if(newPrimary == null) {
+        //could not find a new primary to create
+        break;
+      }
+      if(!addToConnectionList(newPrimary, true)) {
+        excludedServers.add(newPrimary.getServer());
+        newPrimary = null;
+      }
+      
+      if (newPrimary != null) {
+        if (isDebugEnabled) {
+          logger.debug("SubscriptionManager redundancy satisfier - Non backup server was made primary. Recovering interest {}", newPrimary.getEndpoint());
+        }
+
+        if(!recoverInterest(newPrimary, true)) {
+          excludedServers.add(newPrimary.getServer());
+          newPrimary = null;
+        }
+        //New primary queue was found from a non backup, alert the affected cqs
+        cqsConnected();
+      }
+
+      if (newPrimary != null && PoolImpl.AFTER_PRIMARY_RECOVERED_CALLBACK_FLAG) {
+        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        bo.afterPrimaryRecovered(newPrimary.getServer());
+      }
+      printPrimaryNotFoundError = true;
+      printRecoveringPrimary = true;
+      return;
+    }
+    //No primary queue was found, alert the affected cqs
+    cqsDisconnected();
+    if (isDebugEnabled) {
+      logger.debug("SubscriptionManager redundancy satisfier - Could not recover a new primary");
+    }
+    synchronized (lock) {
+      queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
+      lock.notifyAll();
+    }
+  }
+
+  private QueueConnectionImpl initializeQueueConnection(Connection connection,
+      boolean isPrimary, ClientUpdater failedUpdater) {
+    QueueConnectionImpl queueConnection = null;
+    FailureTracker failureTracker = blackList.getFailureTracker(connection.getServer());
+    try {
+      ClientUpdater updater = factory.createServerToClientConnection(connection
+          .getEndpoint(), this, isPrimary, failedUpdater);
+      if (updater != null) {
+        queueConnection = new QueueConnectionImpl(this, connection, updater, failureTracker);
+      } else {
+        logger.warn(LocalizedMessage.create(
+            LocalizedStrings.QueueManagerImpl_UNABLE_TO_CREATE_A_SUBSCRIPTION_CONNECTION_TO_SERVER_0,
+            connection.getEndpoint()));
+      }
+    } catch (Exception e) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("error creating subscription connection to server {}", connection.getEndpoint(), e);
+      }
+    }
+    if (queueConnection == null) {
+      failureTracker.addFailure();
+      connection.destroy();
+    }
+    return queueConnection;
+  }
+  //need flag whether primary is created from backup
+  // for backuup queue lets assume before we add connection, endpoint crashed, now we put in connection but CCU may died as endpoint closed....
+  // so before putting connection need to see if something(crash) happen we should be able to recover from it
+  private boolean addToConnectionList(QueueConnectionImpl connection, boolean isPrimary) {
+    boolean isBadConnection;
+    synchronized(lock) {
+      ClientUpdater cu = connection.getUpdater();
+      if(cu == null || (! cu.isAlive() ) || (!cu.isProcessing()) )
+        return false;//don't add
+      //now still CCU can died but then it will execute Checkendpoint with lock it will remove connection connection and it will reschedule it.
+      if(connection.getEndpoint().isClosed() || shuttingDown || pool.getPoolOrCacheCancelInProgress()!=null) {
+        isBadConnection = true;
+      } else {
+        isBadConnection = false;
+        if(isPrimary) {
+          queueConnections = queueConnections.setPrimary(connection);
+          lock.notifyAll();
+        } else {          
+          queueConnections = queueConnections.addBackup(connection);
+        }
+      }
+    }
+    
+    if(isBadConnection) {
+      if(logger.isDebugEnabled()) {
+        logger.debug("Endpoint {} crashed while creating a connection. The connection will be destroyed", connection.getEndpoint());
+      }
+      connection.internalDestroy();
+    }
+    
+    return !isBadConnection;
+  }
+
+  protected void scheduleRedundancySatisfierIfNeeded(long delay) {
+    if(shuttingDown) {
+      return;
+    }
+    
+    synchronized (lock) {
+      if(shuttingDown) {
+        return;
+      }
+      if (queueConnections.getPrimary() == null
+          || getCurrentRedundancy() < redundancyLevel || redundancyLevel == -1
+          || queueConnections.primaryDiscoveryFailed()) {
+        if (redundancySatisfierTask != null) {
+          if (redundancySatisfierTask.getRemainingDelay() > delay) {
+            redundancySatisfierTask.cancel();
+          } else {
+            return;
+          }
+        }
+
+        redundancySatisfierTask = new RedundancySatisfierTask();
+        try {
+        ScheduledFuture future = recoveryThread.schedule(redundancySatisfierTask,
+            delay, TimeUnit.MILLISECONDS);
+        redundancySatisfierTask.setFuture(future);
+        } catch(RejectedExecutionException e) {
+          //ignore, the timer has been cancelled, which means we're shutting down.
+        }
+      }
+    }
+  }
+  
+
+  private boolean recoverInterest(final QueueConnectionImpl newConnection,
+      final boolean isFirstNewConnection) {
+    
+    if(pool.getPoolOrCacheCancelInProgress() != null) {
+      return true;
+    }
+    
+    // recover interest
+    try {
+      recoverAllInterestTypes(newConnection, isFirstNewConnection);
+      newConnection.getFailureTracker().reset();
+      return true;
+    } 
+    catch (CancelException ignore) {
+      return true;
+      // ok to ignore we are being shutdown
+    } catch (VirtualMachineError err) {
+      SystemFailure.initiateFailure(err);
+      // If this ever returns, rethrow the error. We're poisoned
+      // now, so don't let this thread continue.
+      throw err;
+    } catch (Throwable t) {
+      SystemFailure.checkFailure();
+      pool.getCancelCriterion().checkCancelInProgress(t);
+      logger.warn(LocalizedMessage.create(
+        LocalizedStrings.QueueManagerImpl_QUEUEMANAGERIMPL_FAILED_TO_RECOVER_INTEREST_TO_SERVER_0,
+        newConnection.getServer()), t);
+      newConnection.getFailureTracker().addFailure();
+      newConnection.destroy();
+      return false;
+    }
+  }
+
+  public QueueState getState() {
+    return this.state;
+  }
+
+  private void recoverSingleList(int interestType, Connection recoveredConnection,
+      boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
+    Iterator i = this.getPool().getRITracker()
+    .getRegionToInterestsMap(interestType, isDurable, !receiveValues).values().iterator();
+    while (i.hasNext()) { // restore a region
+      RegionInterestEntry e = (RegionInterestEntry) i.next();
+      recoverSingleRegion(e.getRegion(), e.getInterests(), interestType,
+          recoveredConnection, isDurable, receiveValues, isFirstNewConnection);
+    } // restore a region
+  }
+
+  private void recoverCqs(Connection recoveredConnection, boolean isDurable) {
+    Map cqs = this.getPool().getRITracker().getCqsMap();
+    Iterator i = cqs.entrySet().iterator();
+    while(i.hasNext()) {
+      Map.Entry e = (Map.Entry)i.next();
+      ClientCQ cqi = (ClientCQ)e.getKey();
+      String name = cqi.getName();
+      if (this.pool.getMultiuserAuthentication()) {
+        UserAttributes.userAttributes.set(((DefaultQueryService)this.pool
+            .getQueryService()).getUserAttributes(name));
+      }
+      try {
+        if (((CqStateImpl)cqi.getState()).getState() != CqStateImpl.INIT) {
+          cqi.createOn(recoveredConnection, isDurable);
+        }
+      } finally {
+        UserAttributes.userAttributes.set(null);
+      }
+    }
+  }
+  
+  // TODO this is distressingly similar to LocalRegion#processSingleInterest
+  private void recoverSingleRegion(LocalRegion r, Map keys, int interestType,
+      Connection recoveredConnection, boolean isDurable,
+      boolean receiveValues, boolean isFirstNewConnection) {
+
+    if (logger.isDebugEnabled()) {
+      logger.debug("{}.recoverSingleRegion starting kind={} region={}: {}", this, InterestType.getString(interestType), r.getFullPath(), keys);
+    }
+    
+    // build a HashMap, key is policy, value is list
+    HashMap policyMap = new HashMap();
+    Iterator keysIter = keys.entrySet().iterator();
+    while (keysIter.hasNext()) { // restore and commit an interest
+      Map.Entry me = (Map.Entry) keysIter.next();
+      Object key = me.getKey();
+      InterestResultPolicy pol = (InterestResultPolicy) me.getValue();
+      
+      if (interestType == InterestType.KEY) {
+    	// Gester: we only consolidate the key into list for InterestType.KEY
+        LinkedList keyList = (LinkedList)policyMap.get(pol);
+        if (keyList == null) {
+          
+          keyList = new LinkedList();
+        }
+        keyList.add(key);
+        policyMap.put(pol, keyList);
+      } else {
+        // for other Interest type, do it one by one
+    	recoverSingleKey(r, key, pol, interestType, recoveredConnection,
+    	    isDurable, receiveValues, isFirstNewConnection);
+      }
+    }
+    
+    // Process InterestType.KEY: Iterator list for each each policy
+    Iterator polIter = policyMap.entrySet().iterator();
+    while (polIter.hasNext()) {
+      Map.Entry me = (Map.Entry) polIter.next();
+      LinkedList keyList = (LinkedList)me.getValue();
+      InterestResultPolicy pol = (InterestResultPolicy)me.getKey();
+      recoverSingleKey(r, keyList, pol, interestType, recoveredConnection,
+          isDurable, receiveValues, isFirstNewConnection);
+    }
+  }
+
+  private void recoverSingleKey(LocalRegion r, Object keys,
+      InterestResultPolicy policy, int interestType, Connection recoveredConnection,
+      boolean isDurable, boolean receiveValues, boolean isFirstNewConnection) {
+    r.startRegisterInterest();
+    try {
+      // Remove all matching values from local cache
+      if (isFirstNewConnection) { // only if this recoveredEP
+        // becomes primaryEndpoint
+        r.clearKeysOfInterest(keys, interestType, policy);
+        if (logger.isDebugEnabled()) {
+          logger.debug("{}.recoverSingleRegion :Endpoint recovered is primary so clearing the keys of interest starting kind={} region={}: {}", this, InterestType.getString(interestType), r.getFullPath(), keys);
+        }
+      }
+      // Register interest, get new values back
+      List serverKeys;
+      if (policy != InterestResultPolicy.KEYS_VALUES) {
+      serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
+          keys, interestType, policy, isDurable, !receiveValues,
+          r.getAttributes().getDataPolicy().ordinal);
+      // Restore keys based on server's response
+      if (isFirstNewConnection) {
+        // only if this recoveredEP becomes primaryEndpoint
+        r.refreshEntriesFromServerKeys(recoveredConnection, serverKeys, policy);
+      }
+      } else {
+        if (!isFirstNewConnection) {
+          // InterestResultPolicy.KEYS_VALUES now fetches values in
+          // RegisterInterestOp's response itself and in this case
+          // refreshEntriesFromServerKeys(...) does not explicitly fetch values
+          // but only updates keys-values to region. To not fetch values, we
+          // need to use policy NONE or KEYS.
+          serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
+              keys, interestType, InterestResultPolicy.NONE, isDurable, !receiveValues,
+              r.getAttributes().getDataPolicy().ordinal);
+        } else {
+          serverKeys = r.getServerProxy().registerInterestOn(recoveredConnection,
+              keys, interestType, policy, isDurable, !receiveValues,
+              r.getAttributes().getDataPolicy().ordinal);
+          r.refreshEntriesFromServerKeys(recoveredConnection, serverKeys, policy);
+        }
+      }
+    } finally {
+      r.finishRegisterInterest();
+    }
+  }
+
+  private void recoverInterestList(final Connection recoveredConnection,
+      boolean durable, boolean receiveValues, boolean isFirstNewConnection) {
+    recoverSingleList(InterestType.KEY, recoveredConnection, durable, receiveValues, isFirstNewConnection);
+    recoverSingleList(InterestType.REGULAR_EXPRESSION, recoveredConnection, durable, receiveValues, isFirstNewConnection);
+    recoverSingleList(InterestType.FILTER_CLASS, recoveredConnection, durable, receiveValues, isFirstNewConnection);
+    recoverSingleList(InterestType.OQL_QUERY, recoveredConnection, durable, receiveValues, isFirstNewConnection);
+    // VJR: Recover CQs moved to recoverAllInterestTypes() to avoid multiple
+    // calls for receiveValues flag being true and false.
+    //recoverCqs(recoveredConnection, durable);
+    //recoverSingleList(InterestType.CQ, recoveredConnection, durable,isFirstNewConnection);
+  }
+
+  protected void recoverAllInterestTypes(final Connection recoveredConnection,
+      boolean isFirstNewConnection) {
+    if (PoolImpl.BEFORE_RECOVER_INTERST_CALLBACK_FLAG) {
+      BridgeObserver bo = BridgeObserverHolder.getInstance();
+      bo.beforeInterestRecovery();
+    }
+    recoverInterestList(recoveredConnection, false, true, isFirstNewConnection);
+    recoverInterestList(recoveredConnection, false, false, isFirstNewConnection);
+    recoverCqs(recoveredConnection, false);
+    if ( getPool().isDurableClient()) {
+      recoverInterestList(recoveredConnection, true, true, isFirstNewConnection);
+      recoverInterestList(recoveredConnection, true, false, isFirstNewConnection);
+      recoverCqs(recoveredConnection, true);
+    }
+  }
+
+  
+  /**
+   * A comparator which sorts queue elements in the order of primary first
+   * redundant with smallest queue size ... redundant with largest queue size
+   * 
+   * @author dsmith
+   * 
+   */
+  protected static class QSizeComparator implements java.util.Comparator {
+    public int compare(Object o1, Object o2) {
+      ServerQueueStatus s1 = (ServerQueueStatus) o1;
+      ServerQueueStatus s2 = (ServerQueueStatus) o2;
+      // sort primaries to the front of the list
+      if (s1.isPrimary() && !s2.isPrimary()) {
+        return -1;
+      } else if (!s1.isPrimary() && s2.isPrimary()) {
+        return 1;
+      } else {
+        int diff =  s1.getServerQueueSize() - s2.getServerQueueSize();
+        if(diff != 0) {
+          return diff;
+        } else {
+          return s1.getMemberId().compareTo(s2.getMemberId());
+        }
+      }
+    }
+  }
+
+  /**
+   * A data structure for holding the current set of connections the
+   * queueConnections reference should be maintained by making a copy of this
+   * data structure for each change.
+   * 
+   * Note the the order of the backups is significant. The first backup in the
+   * list is the first server that will be become primary after the primary
+   * fails, etc.
+   * 
+   * The order of backups in this list is the reverse of the order or endpoints
+   * from the old ConnectionProxyImpl .
+   */
+  public class ConnectionList implements QueueConnections {
+    private final QueueConnectionImpl primary;
+    private final Map/* <Endpoint, QueueConnection> */connectionMap;
+    private final List/* <QueueConnection> */backups;
+    /**
+     * The primaryDiscoveryException flag is stronger than just not having any
+     * queue connections It also means we tried all of the possible queue
+     * servers and we'ren't able to connect.
+     */
+    private final GemFireException primaryDiscoveryException;
+    private final QueueConnectionImpl failedPrimary;
+
+    public ConnectionList() {
+      primary = null;
+      connectionMap = Collections.EMPTY_MAP;
+      backups = Collections.EMPTY_LIST;
+      primaryDiscoveryException = null;
+      failedPrimary = null;
+    }
+
+    private ConnectionList(QueueConnectionImpl primary, List backups,
+        GemFireException discoveryException, QueueConnectionImpl failedPrimary) {
+      this.primary = primary;
+      Map allConnectionsTmp = new HashMap();
+      for (Iterator itr = backups.iterator(); itr.hasNext();) {
+        QueueConnectionImpl nextConnection = (QueueConnectionImpl) itr.next();
+        allConnectionsTmp.put(nextConnection.getEndpoint(), nextConnection);
+      }
+      if (primary != null) {
+        allConnectionsTmp.put(primary.getEndpoint(), primary);
+      }
+      this.connectionMap = Collections.unmodifiableMap(allConnectionsTmp);
+      this.backups = Collections.unmodifiableList(new ArrayList(backups));
+      pool.getStats().setSubscriptionCount(connectionMap.size());
+      this.primaryDiscoveryException = discoveryException;
+      this.failedPrimary = failedPrimary;
+    }
+
+    public ConnectionList setPrimary(QueueConnectionImpl newPrimary) {
+      List newBackups = backups;
+      if (backups.contains(newPrimary)) {
+        newBackups = new ArrayList(backups);
+        newBackups.remove(newPrimary);
+      }
+      return new ConnectionList(newPrimary, newBackups, null, null);
+    }
+
+    public ConnectionList setPrimaryDiscoveryFailed(
+        GemFireException p_discoveryException) {
+      GemFireException discoveryException = p_discoveryException;
+      if(discoveryException == null) {
+        discoveryException = new NoSubscriptionServersAvailableException("Primary discovery failed.");
+      }
+      return new ConnectionList(primary, backups, discoveryException, failedPrimary);
+    }
+
+    public ConnectionList addBackup(QueueConnectionImpl queueConnection) {
+      ArrayList newBackups = new ArrayList(backups);
+      newBackups.add(queueConnection);
+      return new ConnectionList(primary, newBackups, primaryDiscoveryException, failedPrimary);
+    }
+
+    public ConnectionList removeConnection(QueueConnectionImpl connection) {
+      if (primary == connection) {
+        return new ConnectionList(null, backups, primaryDiscoveryException, primary);
+      } else {
+        ArrayList newBackups = new ArrayList(backups);
+        newBackups.remove(connection);
+        return new ConnectionList(primary, newBackups, primaryDiscoveryException, failedPrimary);
+      }
+    }
+
+    public Connection getPrimary() {
+      return primary;
+    }
+
+    public List/* <QueueConnection> */getBackups() {
+      return backups;
+    }
+    
+    /**
+     * Return the cache client updater from the previously
+     * failed primary
+     * @return the previous updater or null if there is no previous updater
+     */
+    public ClientUpdater getFailedUpdater() {
+      if(failedPrimary != null) {
+        return failedPrimary.getUpdater();
+      } else {
+        return null;
+      }
+    }
+
+    public boolean primaryDiscoveryFailed() {
+      return primaryDiscoveryException != null;
+    }
+    
+    public GemFireException getPrimaryDiscoveryException() {
+      return primaryDiscoveryException;
+    }
+
+    public QueueConnectionImpl getConnection(Endpoint endpoint) {
+      return (QueueConnectionImpl) connectionMap.get(endpoint);
+    }
+
+    /** return a copy of the list of all server locations */
+    public Set/* <ServerLocation> */getAllLocations() {
+      HashSet locations = new HashSet();
+      for (Iterator itr = connectionMap.keySet().iterator(); itr.hasNext();) {
+        com.gemstone.gemfire.cache.client.internal.Endpoint endpoint = (com.gemstone.gemfire.cache.client.internal.Endpoint) itr.next();
+        locations.add(endpoint.getLocation());
+      }
+
+      return locations;
+    }
+  }
+  
+  protected void logError(StringId message, Throwable t) {
+    if(t instanceof GemFireSecurityException) {
+      securityLogger.error(message, t);
+    } else { 
+      logger.error(message, t);
+    }
+  }
+
+  /**
+   * Asynchronous task which tries to restablish a primary connection and
+   * satisfy redundant requirements.
+   * 
+   * This task should only be running in a single thread at a time. This task is
+   * the only way that new queue servers will be added, and the only way that a
+   * backup server can transistion to a primary server.
+   * 
+   */
+  protected class RedundancySatisfierTask extends PoolTask {
+    private boolean isCancelled;
+    private ScheduledFuture future;
+
+    public void setFuture(ScheduledFuture future) {
+      this.future = future;
+    }
+
+    public long getRemainingDelay() {
+      return future.getDelay(TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void run2() {
+      try {
+        initializedLatch.await();
+        synchronized (lock) {
+          if (isCancelled) {
+            return;
+          } else {
+            redundancySatisfierTask = null;
+          }
+          if(pool.getPoolOrCacheCancelInProgress()!=null) {
+	    /* wake up waiters so they can detect cancel */
+	    lock.notifyAll();
+            return;
+          }
+        }
+        Set excludedServers = queueConnections.getAllLocations();
+        excludedServers.addAll(blackList.getBadServers());
+        excludedServers.addAll(factory.getBlackList().getBadServers());
+        recoverPrimary(excludedServers);
+        recoverRedundancy(excludedServers, true);
+      } catch (VirtualMachineError err) {
+        SystemFailure.initiateFailure(err);
+        // If this ever returns, rethrow the error. We're poisoned
+        // now, so don't let this thread continue.
+        throw err;
+      } 
+      catch (CancelException e) {
+        throw e;
+      } 
+      catch (Throwable t) {
+        // Whenever you catch Error or Throwable, you must also
+        // catch VirtualMachineError (see above).  However, there is
+        // _still_ a possibility that you are dealing with a cascading
+        // error condition, so you also need to check to see if the JVM
+        // is still usable:
+        SystemFailure.checkFailure();
+        synchronized (lock) {
+          if(t instanceof GemFireSecurityException) {
+            queueConnections = queueConnections.setPrimaryDiscoveryFailed((GemFireSecurityException) t);
+          } else {
+            queueConnections = queueConnections.setPrimaryDiscoveryFailed(null);
+          }
+          lock.notifyAll();
+          pool.getCancelCriterion().checkCancelInProgress(t);
+          logError(LocalizedStrings.QueueManagerImpl_ERROR_IN_REDUNDANCY_SATISFIER, t);
+        }
+      }
+
+      scheduleRedundancySatisfierIfNeeded(redundancyRetryInterval);
+    }
+
+    public boolean cancel() {
+      synchronized (lock) {
+        if(isCancelled) {
+          return false;
+        }
+        isCancelled = true;
+        future.cancel(false);
+        redundancySatisfierTask = null;
+        return true;
+      }
+    }
+
+  }
+
+  public static void loadEmergencyClasses() {
+    QueueConnectionImpl.loadEmergencyClasses();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueState.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueState.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueState.java
new file mode 100755
index 0000000..e4df40a
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueState.java
@@ -0,0 +1,17 @@
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.concurrent.ScheduledExecutorService;
+import com.gemstone.gemfire.internal.cache.EventID;
+
+public interface QueueState {
+
+  public void processMarker();
+  public boolean getProcessedMarker();
+  public void incrementInvalidatedStats();
+  public boolean verifyIfDuplicate(EventID eventId, boolean addToMap);
+  public boolean verifyIfDuplicate(EventID eventId);
+  /** test hook
+   */
+  public java.util.Map getThreadIdToSequenceIdMap();
+  public void start(ScheduledExecutorService timer, int interval);
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueStateImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueStateImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueStateImpl.java
new file mode 100755
index 0000000..3dda60b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/QueueStateImpl.java
@@ -0,0 +1,440 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl.PoolTask;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+public class QueueStateImpl implements QueueState {
+  private static final Logger logger = LogService.getLogger();
+
+  protected QueueManager qManager = null;
+  private boolean processedMarker = false;
+  private final AtomicInteger invalidateCount = new AtomicInteger();
+
+  /**
+   * This will store the ThreadId to latest received sequence Id
+   * 
+   * Keys are instances of {@link ThreadIdentifier} Values are instances of
+   * {@link SequenceIdAndExpirationObject}
+   */
+  protected final Map threadIdToSequenceId = new LinkedHashMap();
+
+  public QueueStateImpl(QueueManager qm) {
+    this.qManager = qm;
+  }
+
+  public void processMarker() {
+    if (!this.processedMarker) {     
+      handleMarker();
+      this.processedMarker = true;
+    } else {
+      if (logger.isDebugEnabled()) {
+        logger.debug("{}: extra marker received", this);
+      }
+    }
+  }
+
+  public boolean getProcessedMarker() {
+    return this.processedMarker;
+  }
+
+  public void handleMarker() {
+    ArrayList regions = new ArrayList();
+    Cache cache = GemFireCacheImpl.getInstance();
+    if (cache == null) {
+      return;
+    }
+
+    Set rootRegions = cache.rootRegions();
+
+    
+    for (Iterator iter1 = rootRegions.iterator(); iter1.hasNext();) {
+      Region rootRegion = (Region) iter1.next();
+      regions.add(rootRegion);
+      try {
+        Set subRegions = rootRegion.subregions(true); // throws RDE
+        for (Iterator iter2 = subRegions.iterator(); iter2.hasNext();) {
+          regions.add(iter2.next());
+        }
+      } catch (RegionDestroyedException e) {
+        continue; // region is gone go to the next one bug 38705
+      }
+    }
+
+    for (Iterator iter = regions.iterator(); iter.hasNext();) {
+      LocalRegion region = (LocalRegion) iter.next();
+      try {
+        if (region.getAttributes().getPoolName()!=null && region.getAttributes().getPoolName().equals(qManager.getPool().getName())) {
+          region.handleMarker(); // can this throw RDE??
+        }
+      }
+      catch (RegionDestroyedException e) {
+        continue; // region is gone go to the next one bug 38705
+      }
+    }
+  }
+
+  public void incrementInvalidatedStats() {
+    this.invalidateCount.incrementAndGet();
+
+  }
+  public int getInvalidateCount() {
+    return this.invalidateCount.get();
+  }
+  
+  /** test hook
+   */
+  public Map getThreadIdToSequenceIdMap() {
+    return this.threadIdToSequenceId;
+  }
+  
+  public boolean verifyIfDuplicate(EventID eid) {
+    return verifyIfDuplicate(eid, true);  
+  }
+  
+  public boolean verifyIfDuplicate(EventID eid, boolean addToMap) {
+    ThreadIdentifier tid = new ThreadIdentifier(eid.getMembershipID(), eid
+        .getThreadID());
+    long seqId = eid.getSequenceID();
+    SequenceIdAndExpirationObject seo = null;
+
+    // Fix 36930: save the max sequence id for each non-putAll operation's thread
+    // There're totally 3 cases to consider:
+    // check the tid:
+    // 1) if duplicated, (both putall or non-putall): reject
+    // 2) if not duplicate
+    //    2.1)if putAll, check via real thread id again, 
+    //        if duplicate, reject (because one non-putall operation with bigger 
+    //        seqno has happened) 
+    //        otherwise save the putAllSeqno for real thread id 
+    //        and save seqno for tid
+    //    2.2) if not putAll, 
+    //        check putAllSequenceId with real thread id
+    //        if request's seqno is smaller, reject (because one putAll operation
+    //         with bigger seqno has happened)
+    //        otherwise, update the seqno for tid
+    // lock taken to avoid concurrentModification
+    // while the objects are being expired
+    synchronized (this.threadIdToSequenceId) {
+      seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(tid);
+      if (seo != null && seo.getSequenceId() >= seqId) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(" got a duplicate entry with EventId {}. Ignoring the entry", eid);
+        }
+        seo.setAckSend(false); // bug #41289: send ack to this server since it's sending old events
+        // this.threadIdToSequenceId.put(tid, new SequenceIdAndExpirationObject(
+        // seo.getSequenceId()));
+        return true;
+      }
+      else if (addToMap) {
+        ThreadIdentifier real_tid = new ThreadIdentifier(eid.getMembershipID(), 
+            ThreadIdentifier.getRealThreadIDIncludingWan(eid.getThreadID()));
+        if  (ThreadIdentifier.isPutAllFakeThreadID(eid.getThreadID())) {
+          // it's a putAll
+          seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(real_tid);
+          if (seo != null && seo.getSequenceId() >= seqId) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("got a duplicate putAll entry with eventId {}. Other operation with same thread id and bigger seqno {} has happened. Ignoring the entry", eid, seo.getSequenceId());
+            }
+            seo.setAckSend(false); // bug #41289: send ack to servers that send old events
+            return true;
+          }
+          else {
+            // save the seqno for real thread id into a putAllSequenceId 
+            this.threadIdToSequenceId.remove(real_tid);
+            this.threadIdToSequenceId.put(real_tid, seo == null? 
+                new SequenceIdAndExpirationObject(-1, seqId): 
+                new SequenceIdAndExpirationObject(seo.getSequenceId(), seqId));
+            // save seqno for tid
+            // here tid!=real_tid, for fake tid, putAllSeqno should be 0
+            this.threadIdToSequenceId.remove(tid);
+            this.threadIdToSequenceId.put(tid, new SequenceIdAndExpirationObject(seqId, -1));
+          }
+        } else {
+          // non-putAll operation:
+          // check putAllSeqno for real thread id
+          // if request's seqno is smaller, reject
+          // otherwise, update the seqno for tid
+          seo = (SequenceIdAndExpirationObject) this.threadIdToSequenceId.get(real_tid);
+          if (seo != null && seo.getPutAllSequenceId() >= seqId) {
+            if (logger.isDebugEnabled()) {
+              logger.debug("got a duplicate non-putAll entry with eventId {}. One putAll operation with same real thread id and bigger seqno {} has happened. Ignoring the entry", eid, seo.getPutAllSequenceId());
+            }
+            seo.setAckSend(false); // bug #41289: send ack to servers that send old events
+            return true;
+          }
+          else {
+            // here tid==real_tid
+            this.threadIdToSequenceId.remove(tid);
+            this.threadIdToSequenceId.put(tid, seo == null? 
+                new SequenceIdAndExpirationObject(seqId, -1):
+                new SequenceIdAndExpirationObject(seqId, seo.getPutAllSequenceId()));
+          }
+        }
+      }
+    }
+    return false;
+  }
+  public void start(ScheduledExecutorService timer, int interval) {
+    timer.scheduleWithFixedDelay(new ThreadIdToSequenceIdExpiryTask(),
+                              interval, interval, TimeUnit.MILLISECONDS);
+  }
+  
+  /**
+   * 
+   * Thread which will iterate over threadIdToSequenceId map
+   * 
+   * 1)It will send an ack primary server for all threadIds for which it has not
+   * send an ack. 2)It will expire the entries which have exceeded the specified
+   * expiry time and for which ack has been alerady sent.
+   * 
+   * @author darrel
+   * @author Mitul Bid
+   * @author Suyog Bhokare
+   * @since 5.1
+   * 
+   */
+
+  private class ThreadIdToSequenceIdExpiryTask extends PoolTask {
+    /**
+     * The expiry time of the entries in the map
+     */
+    private final long expiryTime;
+
+    /**
+     * The peridic ack interval for client
+     */
+//     private final long ackTime;
+//       ackTime = QueueStateImpl.this.qManager.getPool().getQueueAckInterval();
+
+//     /**
+//      * boolean to specify if the thread should continue running
+//      */
+//     private volatile boolean continueRunning = true;
+
+    /**
+     * constructs the Thread and initializes the expiry time
+     * 
+     */
+    public ThreadIdToSequenceIdExpiryTask() {
+      expiryTime = QueueStateImpl.this.qManager.getPool()
+          .getSubscriptionMessageTrackingTimeout();
+    }
+    
+    @Override
+    public void run2() {
+      SystemFailure.checkFailure();
+      if (qManager.getPool().getCancelCriterion().cancelInProgress() != null) {
+        return;
+      }
+      if (PoolImpl.BEFORE_SENDING_CLIENT_ACK_CALLBACK_FLAG) {
+        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        bo.beforeSendingClientAck();
+      }
+      //if ((qManager.getPool().getSubscriptionRedundancy() != 0) || (qManager.getPool().isDurableClient())) {
+        sendPeriodicAck();
+      //}
+      checkForExpiry();
+    }
+
+//     void shutdown() {
+//       synchronized (this) {
+//         continueRunning = false;
+//         this.notify();
+//         // Since the wait is timed, it is not necessary to interrupt
+//         // the thread; it will wake up of its own accord.
+//         // this.interrupt();
+//       }
+//       try {
+//         this.join();
+//       } catch (InterruptedException e) {
+//         Thread.currentThread().interrupt();
+//         // TODO:
+//       }
+//     }
+
+    void checkForExpiry() {
+      synchronized (threadIdToSequenceId) {
+        Iterator iterator = threadIdToSequenceId.entrySet().iterator();
+        long currentTime = System.currentTimeMillis();
+        Map.Entry entry;
+        SequenceIdAndExpirationObject seo;
+
+        while (iterator.hasNext()) {
+          entry = (Map.Entry) iterator.next();
+          seo = (SequenceIdAndExpirationObject) entry.getValue();
+          if ((currentTime - seo.getCreationTime() > this.expiryTime)) {
+            if (seo.getAckSend()
+                || (qManager.getPool().getSubscriptionRedundancy() == 0 && !qManager.getPool().isDurableClient())) {
+              iterator.remove();
+            }
+          } else {
+            break;
+          }
+        }
+      }
+    }
+
+    /**
+     * Sends Periodic ack to the primary server for all threadIds for which it
+     * has not send an ack.
+     */
+    void sendPeriodicAck() {
+      List events = new ArrayList();
+      boolean success = false;
+      synchronized (threadIdToSequenceId) {
+        Iterator iterator = threadIdToSequenceId.entrySet().iterator();
+        while (iterator.hasNext()) {
+          Map.Entry entry = (Map.Entry) iterator.next();
+          SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject) entry
+              .getValue();
+          if (!seo.getAckSend()) {
+            ThreadIdentifier tid = (ThreadIdentifier) entry.getKey();
+            events.add(new EventID(tid.getMembershipID(), tid.getThreadID(),
+                seo.getSequenceId()));
+            seo.setAckSend(true);
+            // entry.setValue(entry);
+          }// if ends
+        }// while ends
+      }// synchronized ends
+
+      if (events.size() > 0) {
+        try {
+          PrimaryAckOp.execute(qManager.getAllConnections().getPrimary(), qManager.getPool(), events);
+          success = true;
+        } catch (Exception ex) {
+          if (logger.isDebugEnabled())
+            logger.debug("Exception while sending an ack to the primary server: {}", ex);
+        } finally {
+          if (!success) {
+            Iterator iter = events.iterator();
+            while (iter.hasNext()) {
+              EventID eid = (EventID) iter.next();
+              ThreadIdentifier tid = new ThreadIdentifier(
+                  eid.getMembershipID(), eid.getThreadID());
+              synchronized (threadIdToSequenceId) {
+                SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject) threadIdToSequenceId
+                    .get(tid);
+                if (seo != null && seo.getAckSend()) {
+                  seo = (SequenceIdAndExpirationObject) threadIdToSequenceId
+                      .remove(tid);
+                  if (seo != null) {
+                    // put back the old seqId with a new time stamp
+                    SequenceIdAndExpirationObject siaeo = new SequenceIdAndExpirationObject(
+                        seo.getSequenceId(), seo.getPutAllSequenceId());
+                    threadIdToSequenceId.put(tid, siaeo);
+                  }
+                }// if ends
+              }// synchronized ends
+            }// while ends
+          }// if(!success) ends
+        }// finally ends
+      }// if(events.size() > 0)ends
+    }// method ends
+  }
+
+  /**
+   * A class to store sequenceId and the creation time of the object to be used
+   * for expiring the entry
+   * 
+   * @author Mitul Bid
+   * @since 5.1
+   * 
+   */
+  public static class SequenceIdAndExpirationObject {
+    /** The sequence Id of the entry * */
+    private final long sequenceId;
+    /** The sequence Id of the putAll operations * */
+    private final long putAllSequenceId;
+    /** The time of creation of the object* */
+    private final long creationTime;
+    /** Client ack is send to server or not* */
+    private boolean ackSend;
+
+    SequenceIdAndExpirationObject(long sequenceId, long putAllSequenceId) {
+      this.sequenceId = sequenceId;
+      this.putAllSequenceId = putAllSequenceId;
+      this.creationTime = System.currentTimeMillis();
+      this.ackSend = false;
+    }
+
+    /**
+     * @return Returns the creationTime.
+     */
+    public final long getCreationTime() {
+      return creationTime;
+    }
+
+    /**
+     * @return Returns the sequenceId.
+     */
+    public final long getSequenceId() {
+      return sequenceId;
+    }
+
+    /**
+     * @return Returns the putAllSequenceId.
+     */
+    public final long getPutAllSequenceId() {
+      return putAllSequenceId;
+    }
+
+    /**
+     * 
+     * @return Returns the ackSend
+     */
+    public boolean getAckSend() {
+      return ackSend;
+    }
+
+    /**
+     * Sets the ackSend
+     * 
+     * @param ackSend
+     */
+    public void setAckSend(boolean ackSend) {
+      this.ackSend = ackSend;
+    }
+
+    @Override
+    public String toString() {
+      StringBuffer sb = new StringBuffer();
+      sb.append("SequenceIdAndExpirationObject[");
+      sb.append("ackSend = " + this.ackSend);
+      sb.append("; creation = " + creationTime);
+      sb.append("; seq = " + sequenceId);
+      sb.append("; putAll seq = " + putAllSequenceId);
+      sb.append("]");
+      return sb.toString();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
new file mode 100644
index 0000000..8d598d1
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/ReadyForEventsOp.java
@@ -0,0 +1,82 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * Tells the server we are ready to receive server-to-client events
+ * from durable subscriptions.
+ * @author darrel
+ * @since 5.7
+ */
+public class ReadyForEventsOp {
+  /**
+   * Tells the primary server we are ready to receive server-to-client events
+   * from durable subscriptions.
+   * @param pool the pool to use to communicate with the server.
+   * @param primary 
+   */
+  public static void execute(ExecutablePool pool, QueueConnectionImpl primary)
+  {
+    AbstractOp op = new ReadyForEventsOpImpl();
+    pool.executeOn(primary, op);
+  }
+                                                               
+  private ReadyForEventsOp() {
+    // no instances allowed
+  }
+  
+  private static class ReadyForEventsOpImpl extends AbstractOp {
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public ReadyForEventsOpImpl() {
+      super(MessageType.CLIENT_READY, 1);
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "readyForEvents");
+      return null;
+    }
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startReadyForEvents();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endReadyForEventsSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endReadyForEvents(start, hasTimedOut(), hasFailed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/19459053/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
new file mode 100644
index 0000000..c3ab84b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/cache/client/internal/RegisterDataSerializersOp.java
@@ -0,0 +1,129 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.io.IOException;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.SerializationException;
+import com.gemstone.gemfire.internal.InternalDataSerializer.SerializerAttributesHolder;
+import com.gemstone.gemfire.internal.cache.BridgeObserver;
+import com.gemstone.gemfire.internal.cache.BridgeObserverHolder;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.util.BlobHelper;
+
+public class RegisterDataSerializersOp {
+
+  public static void execute(ExecutablePool pool,
+      DataSerializer[] dataSerializers, EventID eventId) {
+    AbstractOp op = new RegisterDataSerializersOpImpl(dataSerializers,
+        eventId);
+    pool.execute(op);
+  }
+  
+  public static void execute(ExecutablePool pool,
+      SerializerAttributesHolder[] holders, EventID eventId) {
+    AbstractOp op = new RegisterDataSerializersOpImpl(holders,
+        eventId);
+    pool.execute(op);
+  }
+  
+  private RegisterDataSerializersOp() {
+    // no instances allowed
+  }
+  
+  private static class RegisterDataSerializersOpImpl extends AbstractOp {
+
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization fails
+     */
+    public RegisterDataSerializersOpImpl(DataSerializer[] dataSerializers,
+        EventID eventId) {
+      super(MessageType.REGISTER_DATASERIALIZERS, dataSerializers.length * 2 + 1);
+      for(int i = 0; i < dataSerializers.length; i++) {
+        DataSerializer dataSerializer = dataSerializers[i];
+         // strip '.class' off these class names
+        String className = dataSerializer.getClass().toString().substring(6);
+        try {
+          getMessage().addBytesPart(BlobHelper.serializeToBlob(className));
+        } catch (IOException ex) {
+          throw new SerializationException("failed serializing object", ex);
+        }
+        getMessage().addIntPart(dataSerializer.getId());
+      }
+      getMessage().addBytesPart(eventId.calcBytes());
+      // // CALLBACK FOR TESTING PURPOSE ONLY ////
+      if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        bo.beforeSendingToServer(eventId);
+      }
+   }
+    
+    /**
+     * @throws SerializationException
+     *           Thrown when serialization fails.
+     */
+    public RegisterDataSerializersOpImpl(SerializerAttributesHolder[] holders,
+        EventID eventId) {
+      super(MessageType.REGISTER_DATASERIALIZERS, holders.length * 2 + 1);
+      for (int i = 0; i < holders.length; i++) {
+        try {
+          getMessage().addBytesPart(
+              BlobHelper.serializeToBlob(holders[i].getClassName()));
+        } catch (IOException ex) {
+          throw new SerializationException("failed serializing object", ex);
+        }
+        getMessage().addIntPart(holders[i].getId());
+      }
+      getMessage().addBytesPart(eventId.calcBytes());
+      // // CALLBACK FOR TESTING PURPOSE ONLY ////
+      if (PoolImpl.IS_INSTANTIATOR_CALLBACK) {
+        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        bo.beforeSendingToServer(eventId);
+      }
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      processAck(msg, "registerDataSerializers");
+      return null;
+    }
+    
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startRegisterDataSerializers();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endRegisterDataSerializersSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endRegisterDataSerializers(start, hasTimedOut(), hasFailed());
+    }
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+  }
+}