You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by vf...@apache.org on 2015/11/25 20:07:54 UTC

[41/50] [abbrv] incubator-geode git commit: GEODE-243: remove deprecated Bridge feature

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java
new file mode 100644
index 0000000..646f336
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java
@@ -0,0 +1,164 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.server.ServerLoad;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+
+
+/**
+ * Used to give advice to a cache server.
+ * Cache server currently need to know about controller's
+ * @author darrel
+ *
+ */
+public class CacheServerAdvisor extends GridAdvisor {
+  
+  private CacheServerAdvisor(DistributionAdvisee server) {
+    super(server);
+  }
+
+  public static CacheServerAdvisor createCacheServerAdvisor(DistributionAdvisee server) {
+    CacheServerAdvisor advisor = new CacheServerAdvisor(server);
+    advisor.initialize();
+    return advisor;
+  }
+
+  @Override
+  public String toString() {
+    return "CacheServerAdvisor for " + getAdvisee().getFullPath();
+  }
+
+  /** Instantiate new distribution profile for this member */
+  @Override
+  protected Profile instantiateProfile(
+      InternalDistributedMember memberId, int version) {
+    return new CacheServerProfile(memberId, version);
+  }
+  
+  /**
+   * Describes a cache server for distribution purposes.
+   */
+  public static class CacheServerProfile extends GridAdvisor.GridProfile {
+    private String[] groups;
+    private int maxConnections;
+    private ServerLoad initialLoad;
+    private long loadPollInterval;
+
+    /** for internal use, required for DataSerializer.readObject */
+    public CacheServerProfile() {
+    }
+
+    public CacheServerProfile(InternalDistributedMember memberId, int version) {
+      super(memberId, version);
+    }
+
+    public CacheServerProfile(CacheServerProfile toCopy) {
+      super(toCopy);
+      this.groups = toCopy.groups;
+    }
+
+    /** don't modify the returned array! */
+    public String[] getGroups() {
+      return this.groups;
+    }
+    public void setGroups(String[] groups) {
+      this.groups = groups;
+    }
+    
+    public ServerLoad getInitialLoad() {
+      return initialLoad;
+    }
+    
+    public int getMaxConnections() {
+      return maxConnections;
+    }
+    
+    public void setMaxConnections(int maxConnections) {
+      this.maxConnections = maxConnections;
+    }
+
+    public void setInitialLoad(ServerLoad initialLoad) {
+      this.initialLoad = initialLoad;
+    }
+    public long getLoadPollInterval() {
+      return this.loadPollInterval;
+    }
+    public void setLoadPollInterval(long v) {
+      this.loadPollInterval = v;
+    }
+
+    /**
+     * Used to process an incoming cache server profile. Any controller in this
+     * vm needs to be told about this incoming new cache server. The reply
+     * needs to contain any controller(s) that exist in this vm.
+     * 
+     * @since 5.7
+     */
+    @Override
+    public void processIncoming(DistributionManager dm, String adviseePath,
+        boolean removeProfile, boolean exchangeProfiles,
+        final List<Profile> replyProfiles) {
+      // tell local controllers about this cache server
+      tellLocalControllers(removeProfile, exchangeProfiles, replyProfiles);
+      // for QRM messaging we need cache servers to know about each other
+      tellLocalBridgeServers(removeProfile, exchangeProfiles, replyProfiles);
+    }
+
+    @Override
+    public int getDSFID() {
+      return CACHE_SERVER_PROFILE;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      super.toData(out);
+      DataSerializer.writeStringArray(this.groups, out);
+      out.writeInt(maxConnections);
+      InternalDataSerializer.invokeToData(initialLoad, out);
+      out.writeLong(getLoadPollInterval());
+    }
+
+    @Override
+    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+      super.fromData(in);
+      this.groups = DataSerializer.readStringArray(in);
+      this.maxConnections = in.readInt();
+      this.initialLoad = new ServerLoad();
+      InternalDataSerializer.invokeFromData(initialLoad, in);
+      setLoadPollInterval(in.readLong());
+    }
+
+    @Override
+    public StringBuilder getToStringHeader() {
+      return new StringBuilder("BridgeServerProfile");
+    }
+
+    @Override
+    public void fillInToString(StringBuilder sb) {
+      super.fillInToString(sb);
+      if (this.groups != null) {
+        sb.append("; groups=" + Arrays.asList(this.groups));
+        sb.append("; maxConnections=" + maxConnections);
+        sb.append("; initialLoad=" + initialLoad);
+        sb.append("; loadPollInterval=" + getLoadPollInterval());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
new file mode 100644
index 0000000..422711e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
@@ -0,0 +1,812 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.InvalidValueException;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.ClientSession;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.DynamicRegionFactory;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.ExpirationAction;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.InterestRegistrationListener;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
+import com.gemstone.gemfire.cache.server.ServerLoadProbe;
+import com.gemstone.gemfire.cache.server.internal.LoadMonitor;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.OSProcess;
+import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion;
+import com.gemstone.gemfire.internal.cache.CacheServerAdvisor.CacheServerProfile;
+import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+import com.gemstone.gemfire.internal.cache.tier.Acceptor;
+import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.management.membership.ClientMembership;
+import com.gemstone.gemfire.management.membership.ClientMembershipListener;
+
+/**
+ * An implementation of the <code>CacheServer</code> interface that delegates
+ * most of the heavy lifting to an {@link Acceptor}.
+ * 
+ * @author David Whitlock
+ * @since 4.0
+ */
+@SuppressWarnings("deprecation")
+public class CacheServerImpl
+  extends AbstractCacheServer
+  implements DistributionAdvisee {
+
+  private static final Logger logger = LogService.getLogger();
+  
+  private static final int FORCE_LOAD_UPDATE_FREQUENCY= Integer.getInteger("gemfire.BridgeServer.FORCE_LOAD_UPDATE_FREQUENCY", 10).intValue();
+  
+  /** The acceptor that does the actual serving */
+  private volatile AcceptorImpl acceptor;
+
+  /**
+   * The advisor used by this cache server.
+   * @since 5.7
+   */
+  private volatile CacheServerAdvisor advisor;
+
+  /**
+   * The monitor used to monitor load on this
+   * bridge server and distribute load to the locators
+   * @since 5.7
+   */
+  private volatile LoadMonitor loadMonitor;
+
+  /**
+   * boolean that represents whether this server is a GatewayReceiver or a simple BridgeServer
+   */
+  private boolean isGatewayReceiver;
+  
+  private List<GatewayTransportFilter> gatewayTransportFilters = Collections.EMPTY_LIST;
+  
+  /**
+   * Needed because this guy is an advisee
+   * @since 5.7
+   */
+  private int serialNumber; // changed on each start
+
+  public static final boolean ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE = 
+  Boolean.getBoolean("gemfire.cache-server.enable-notify-by-subscription-false");
+  
+ 
+  // ////////////////////// Constructors //////////////////////
+
+  /**
+   * Creates a new <code>BridgeServerImpl</code> that serves the contents of
+   * the give <code>Cache</code>. It has the default configuration.
+   */
+  public CacheServerImpl(GemFireCacheImpl cache, boolean isGatewayReceiver) {
+    super(cache);
+    this.isGatewayReceiver = isGatewayReceiver;
+  }
+
+  // //////////////////// Instance Methods ///////////////////
+  
+  public CancelCriterion getCancelCriterion() {
+    return cache.getCancelCriterion();    
+  }
+
+  /**
+   * Checks to see whether or not this bridge server is running. If so, an
+   * {@link IllegalStateException} is thrown.
+   */
+  private void checkRunning() {
+    if (this.isRunning()) {
+      throw new IllegalStateException(LocalizedStrings.CacheServerImpl_A_CACHE_SERVERS_CONFIGURATION_CANNOT_BE_CHANGED_ONCE_IT_IS_RUNNING.toLocalizedString());
+    }
+  }
+
+  public boolean isGatewayReceiver() {
+    return this.isGatewayReceiver;
+  }
+  
+  @Override
+  public int getPort() {
+    if (this.acceptor != null) {
+      return this.acceptor.getPort();
+    }
+    else {
+      return super.getPort();
+    }
+  }
+
+  @Override
+  public void setPort(int port) {
+    checkRunning();
+    super.setPort(port);
+  }
+
+  @Override
+  public void setBindAddress(String address) {
+    checkRunning();
+    super.setBindAddress(address);
+  }
+  @Override
+  public void setHostnameForClients(String name) {
+    checkRunning();
+    super.setHostnameForClients(name);
+  }
+
+  @Override
+  public void setMaxConnections(int maxCon) {
+    checkRunning();
+    super.setMaxConnections(maxCon);
+  }
+
+  @Override
+  public void setMaxThreads(int maxThreads) {
+    checkRunning();
+    super.setMaxThreads(maxThreads);
+  }
+
+  @Override
+  public void setNotifyBySubscription(boolean b) {
+    checkRunning();
+    if (CacheServerImpl.ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE) {
+      this.notifyBySubscription = b;
+    }
+  }
+
+  @Override
+  public void setMaximumMessageCount(int maximumMessageCount) {
+    checkRunning();
+    super.setMaximumMessageCount(maximumMessageCount);
+  }
+
+  @Override
+  public void setSocketBufferSize(int socketBufferSize) {
+    this.socketBufferSize = socketBufferSize;
+  }
+
+  @Override
+  public int getSocketBufferSize() {
+    return this.socketBufferSize;
+  }
+  
+  @Override
+  public void setMaximumTimeBetweenPings(int maximumTimeBetweenPings) {
+    this.maximumTimeBetweenPings = maximumTimeBetweenPings;
+  }
+
+  @Override
+  public int getMaximumTimeBetweenPings() {
+    return this.maximumTimeBetweenPings;
+  }
+
+
+  @Override
+  public void setLoadPollInterval(long loadPollInterval) {
+    checkRunning();
+    super.setLoadPollInterval(loadPollInterval);
+  }
+
+  @Override
+  public int getMaximumMessageCount() {
+    return this.maximumMessageCount;
+  }
+
+  @Override
+  public void setLoadProbe(ServerLoadProbe loadProbe) {
+    checkRunning();
+    super.setLoadProbe(loadProbe);
+  }
+
+  public void setGatewayTransportFilter(
+      List<GatewayTransportFilter> transportFilters) {
+    this.gatewayTransportFilters = transportFilters;
+  }
+  
+  @Override
+  public int getMessageTimeToLive() {
+    return this.messageTimeToLive;
+  }
+  
+
+  public ClientSubscriptionConfig getClientSubscriptionConfig(){
+    return this.clientSubscriptionConfig;
+  }
+
+  /**
+   * Sets the configuration of <b>this</b> <code>CacheServer</code> based on
+   * the configuration of <b>another</b> <code>CacheServer</code>.
+   */
+  public void configureFrom(CacheServer other) {
+    setPort(other.getPort());
+    setBindAddress(other.getBindAddress());
+    setHostnameForClients(other.getHostnameForClients());
+    setMaxConnections(other.getMaxConnections());
+    setMaxThreads(other.getMaxThreads());
+    setNotifyBySubscription(other.getNotifyBySubscription());
+    setSocketBufferSize(other.getSocketBufferSize());
+    setTcpNoDelay(other.getTcpNoDelay());
+    setMaximumTimeBetweenPings(other.getMaximumTimeBetweenPings());
+    setMaximumMessageCount(other.getMaximumMessageCount());
+    setMessageTimeToLive(other.getMessageTimeToLive());
+//    setTransactionTimeToLive(other.getTransactionTimeToLive());  not implemented in CacheServer for v6.6
+    setGroups(other.getGroups());
+    setLoadProbe(other.getLoadProbe());
+    setLoadPollInterval(other.getLoadPollInterval());
+    ClientSubscriptionConfig cscOther = other.getClientSubscriptionConfig();
+    ClientSubscriptionConfig cscThis = this.getClientSubscriptionConfig();
+    // added for configuration of ha overflow
+    cscThis.setEvictionPolicy(cscOther.getEvictionPolicy());
+    cscThis.setCapacity(cscOther.getCapacity());
+    String diskStoreName = cscOther.getDiskStoreName();
+    if (diskStoreName != null) {
+      cscThis.setDiskStoreName(diskStoreName);
+    } else {
+      cscThis.setOverflowDirectory(cscOther.getOverflowDirectory());
+    }
+  }
+
+  @Override
+  public synchronized void start() throws IOException {
+    Assert.assertTrue(this.cache != null);
+    boolean isSqlFabricSystem = ((GemFireCacheImpl)this.cache).isSqlfSystem();
+    
+    this.serialNumber = createSerialNumber();
+    if (DynamicRegionFactory.get().isOpen()) {
+      // force notifyBySubscription to be true so that meta info is pushed
+      // from servers to clients instead of invalidates.
+      if (!this.notifyBySubscription) {
+        logger.info(LocalizedMessage.create(LocalizedStrings.CacheServerImpl_FORCING_NOTIFYBYSUBSCRIPTION_TO_SUPPORT_DYNAMIC_REGIONS));
+        this.notifyBySubscription = true;
+      }
+    }
+    this.advisor = CacheServerAdvisor.createCacheServerAdvisor(this);
+    this.loadMonitor = new LoadMonitor(loadProbe, maxConnections,
+        loadPollInterval, FORCE_LOAD_UPDATE_FREQUENCY, 
+        advisor);
+    List overflowAttributesList = new LinkedList();
+    ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+    overflowAttributesList.add(0, csc.getEvictionPolicy());
+    overflowAttributesList.add(1, Integer.valueOf(csc.getCapacity()));
+    overflowAttributesList.add(2, Integer.valueOf(this.port));
+    String diskStoreName = csc.getDiskStoreName();
+    if (diskStoreName != null) {
+      overflowAttributesList.add(3, diskStoreName);
+      overflowAttributesList.add(4, true); // indicator to use diskstore
+    } else {
+      overflowAttributesList.add(3, csc.getOverflowDirectory());
+      overflowAttributesList.add(4, false);
+    }
+
+    this.acceptor = new AcceptorImpl(getPort(), 
+                                     getBindAddress(),
+                                     getNotifyBySubscription(),
+                                     getSocketBufferSize(), 
+                                     getMaximumTimeBetweenPings(), 
+                                     this.cache,
+                                     getMaxConnections(), 
+                                     getMaxThreads(), 
+                                     getMaximumMessageCount(),
+                                     getMessageTimeToLive(),
+                                     getTransactionTimeToLive(),
+                                     this.loadMonitor,
+                                     overflowAttributesList, 
+                                     isSqlFabricSystem,
+                                     this.isGatewayReceiver,
+                                     this.gatewayTransportFilters, this.tcpNoDelay);
+
+    this.acceptor.start();
+    this.advisor.handshake();
+    this.loadMonitor.start(new ServerLocation(getExternalAddress(),
+        getPort()), acceptor.getStats());
+    
+    // TODO : Need to provide facility to enable/disable client health monitoring.
+    //Creating ClientHealthMonitoring region.
+    // Force initialization on current cache
+    if(cache instanceof GemFireCacheImpl) {
+      ClientHealthMonitoringRegion.getInstance((GemFireCacheImpl)cache);
+    }
+    this.cache.getLoggerI18n().config(LocalizedStrings.CacheServerImpl_CACHESERVER_CONFIGURATION___0, getConfig());
+    
+    /* 
+     * If the stopped bridge server is restarted, we'll need to re-register the 
+     * client membership listener. If the listener is already registered it 
+     * won't be registered as would the case when start() is invoked for the 
+     * first time.  
+     */
+    ClientMembershipListener[] membershipListeners = 
+                                ClientMembership.getClientMembershipListeners();
+    
+    boolean membershipListenerRegistered = false;
+    for (ClientMembershipListener membershipListener : membershipListeners) {
+      //just checking by reference as the listener instance is final
+      if (listener == membershipListener) {
+        membershipListenerRegistered = true;
+        break;
+      }
+    }
+    
+    if (!membershipListenerRegistered) {
+      ClientMembership.registerClientMembershipListener(listener);
+    }
+    
+    if (!isGatewayReceiver) {
+      InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
+          .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.CACHE_SERVER_START, this);
+    }
+    
+  }
+
+  
+  /**
+   * Gets the address that this bridge server can be contacted on from external
+   * processes.
+   * @since 5.7
+   */
+  public String getExternalAddress() {
+    return getExternalAddress(true);
+  }
+  
+  public String getExternalAddress(boolean checkServerRunning) {
+    if (checkServerRunning) {
+      if (!this.isRunning()) {
+        String s = "A bridge server's bind address is only available if it has been started";
+        this.cache.getCancelCriterion().checkCancelInProgress(null);
+        throw new IllegalStateException(s);
+      }
+    }
+    if (this.hostnameForClients == null || this.hostnameForClients.equals("")) {
+      if (this.acceptor != null) {
+        return this.acceptor.getExternalAddress();
+      }
+      else {
+        return null;
+      }
+    }
+    else {
+      return this.hostnameForClients;
+    }
+  }
+
+  public boolean isRunning() {
+    return this.acceptor != null && this.acceptor.isRunning();
+  }
+
+  public synchronized void stop() {
+    if (!isRunning()) {
+      return;
+    }
+    
+    RuntimeException firstException = null;
+    
+    try {
+      if(this.loadMonitor != null) {
+        this.loadMonitor.stop();
+      }
+    } catch(RuntimeException e) {
+      cache.getLoggerI18n().warning(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_LOAD_MONITOR, e);
+      firstException = e;
+    }
+    
+    try {
+      if (this.advisor != null) {
+        this.advisor.close();
+      }
+    } catch(RuntimeException e) {
+      cache.getLoggerI18n().warning(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_ADVISOR, e);
+      firstException = e;
+    } 
+    
+    try {
+      if (this.acceptor != null) {
+        this.acceptor.close();
+      }
+    } catch(RuntimeException e) {
+      logger.warn(LocalizedMessage.create(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_ACCEPTOR_MONITOR), e);
+      if (firstException != null) {
+        firstException = e;
+      }
+    }
+    
+    if(firstException != null) {
+      throw firstException;
+    }
+    
+    //TODO : We need to clean up the admin region created for client
+    //monitoring.
+    
+    // BridgeServer is still available, just not running, so we don't take
+    // it out of the cache's list...
+    // cache.removeBridgeServer(this);
+
+    /* Assuming start won't be called after stop */
+    ClientMembership.unregisterClientMembershipListener(listener);
+    
+    TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager();
+    txMgr.removeHostedTXStatesForClients();
+    
+    if (!isGatewayReceiver) {
+      InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
+          .getDistributedSystem();
+      system.handleResourceEvent(ResourceEvent.CACHE_SERVER_STOP, this);
+    }
+
+  }
+
+  private String getConfig() {
+    ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+    String str =
+    "port=" + getPort() + " max-connections=" + getMaxConnections()
+        + " max-threads=" + getMaxThreads() + " notify-by-subscription="
+        + getNotifyBySubscription() + " socket-buffer-size="
+        + getSocketBufferSize() + " maximum-time-between-pings="
+        + getMaximumTimeBetweenPings() + " maximum-message-count="
+        + getMaximumMessageCount() + " message-time-to-live="
+        + getMessageTimeToLive() + " eviction-policy=" + csc.getEvictionPolicy()
+        + " capacity=" + csc.getCapacity() + " overflow directory=";
+    if (csc.getDiskStoreName() != null) {
+      str += csc.getDiskStoreName();
+    } else {
+      str += csc.getOverflowDirectory(); 
+    }
+    str += 
+        " groups=" + Arrays.asList(getGroups())
+        + " loadProbe=" + loadProbe
+        + " loadPollInterval=" + loadPollInterval
+        + " tcpNoDelay=" + tcpNoDelay;
+    return str;
+  }
+
+  @Override
+  public String toString() {
+    ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+    String str = 
+    "CacheServer on port=" + getPort() + " client subscription config policy="
+        + csc.getEvictionPolicy() + " client subscription config capacity="
+        + csc.getCapacity();
+    if (csc.getDiskStoreName() != null) {
+      str += " client subscription config overflow disk store="
+        + csc.getDiskStoreName();
+    } else {
+      str += " client subscription config overflow directory="
+        + csc.getOverflowDirectory();
+    }
+    return str;
+  }
+
+  /**
+   * Test method used to access the internal acceptor
+   * 
+   * @return the internal acceptor
+   */
+  public AcceptorImpl getAcceptor() {
+    return this.acceptor;
+  }
+
+  // DistributionAdvisee methods
+
+  public DM getDistributionManager() {
+    return getSystem().getDistributionManager();
+  }
+  
+  public ClientSession getClientSession(String durableClientId) {
+    return getCacheClientNotifier().getClientProxy(durableClientId);
+  }
+
+  public ClientSession getClientSession(DistributedMember member) {
+    return getCacheClientNotifier().getClientProxy(
+        ClientProxyMembershipID.getClientId(member));
+  }
+  
+  public Set getAllClientSessions() {
+    return new HashSet(getCacheClientNotifier().getClientProxies());
+  }
+
+  /**
+   * create client subscription
+   * 
+   * @param cache
+   * @param ePolicy
+   * @param capacity
+   * @param port
+   * @param overFlowDir
+   * @param isDiskStore
+   * @return client subscription name
+   * @since 5.7
+   */
+  public static String clientMessagesRegion(GemFireCacheImpl cache, String ePolicy,
+      int capacity, int port, String overFlowDir, boolean isDiskStore) {
+    AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache, 
+        ePolicy, capacity, overFlowDir, isDiskStore);
+    RegionAttributes attr = factory.create();
+
+    return createClientMessagesRegion(attr, cache, capacity, port);
+  }
+
+  public static AttributesFactory getAttribFactoryForClientMessagesRegion(
+      GemFireCacheImpl cache,
+      String ePolicy, int capacity, String overflowDir, boolean isDiskStore)
+      throws InvalidValueException, GemFireIOException {
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+
+    if (isDiskStore) {
+      // overflowDir parameter is actually diskstore name
+      factory.setDiskStoreName(overflowDir);
+      // client subscription queue is always overflow to disk, so do async
+      // see feature request #41479
+      factory.setDiskSynchronous(true);
+    } else if  (overflowDir == null || overflowDir.equals(ClientSubscriptionConfig.DEFAULT_OVERFLOW_DIRECTORY)) {
+      factory.setDiskStoreName(null);
+      // client subscription queue is always overflow to disk, so do async
+      // see feature request #41479
+      factory.setDiskSynchronous(true);
+    } else {
+      File dir = new File(overflowDir + File.separatorChar
+          + generateNameForClientMsgsRegion(OSProcess.getId()));
+      // This will delete the overflow directory when virtual machine terminates.
+      dir.deleteOnExit();
+      if (!dir.mkdirs() && !dir.isDirectory()) {
+        throw new GemFireIOException("Could not create client subscription overflow directory: "
+            + dir.getAbsolutePath());
+      }
+      File[] dirs = { dir };
+      DiskStoreFactory dsf = cache.createDiskStoreFactory();
+      DiskStore bsi = dsf.setAutoCompact(true)
+      .setDiskDirsAndSizes(dirs, new int[] { Integer.MAX_VALUE })
+      .create("bsi");
+      factory.setDiskStoreName("bsi");
+      // backward compatibility, it was sync
+      factory.setDiskSynchronous(true);
+    }
+    factory.setDataPolicy(DataPolicy.NORMAL);
+    // enable statistics
+    factory.setStatisticsEnabled(true);
+    /* setting LIFO related eviction attributes */
+    if (HARegionQueue.HA_EVICTION_POLICY_ENTRY.equals(ePolicy)) {
+      factory
+          .setEvictionAttributes(EvictionAttributesImpl.createLIFOEntryAttributes(
+              capacity, EvictionAction.OVERFLOW_TO_DISK));
+    }
+    else if (HARegionQueue.HA_EVICTION_POLICY_MEMORY.equals(ePolicy)) { // condition refinement
+      factory
+          .setEvictionAttributes(EvictionAttributesImpl.createLIFOMemoryAttributes(
+              capacity, EvictionAction.OVERFLOW_TO_DISK));
+    }
+    else {
+      // throw invalid eviction policy exception
+      throw new InvalidValueException(
+        LocalizedStrings.CacheServerImpl__0_INVALID_EVICTION_POLICY.toLocalizedString(ePolicy));
+    }
+    return factory;
+  }
+
+  public static String createClientMessagesRegion(RegionAttributes attr,
+      GemFireCacheImpl cache, int capacity, int port) {
+    // generating unique name in VM for ClientMessagesRegion
+    String regionName = generateNameForClientMsgsRegion(port);
+    try {
+      cache.createVMRegion(regionName, attr,
+          new InternalRegionArguments().setDestroyLockFlag(true)
+              .setRecreateFlag(false).setSnapshotInputStream(null)
+              .setImageTarget(null).setIsUsedForMetaRegion(true));
+    }
+    catch (RegionExistsException ree) {
+      InternalGemFireError assErr = new InternalGemFireError(
+          "unexpected exception");
+      assErr.initCause(ree);
+      throw assErr;
+    }
+    catch (IOException e) {
+      // only if loading snapshot, not here
+      InternalGemFireError assErr = new InternalGemFireError(
+          "unexpected exception");
+      assErr.initCause(e);
+      throw assErr;
+    }
+    catch (ClassNotFoundException e) {
+      // only if loading snapshot, not here
+      InternalGemFireError assErr = new InternalGemFireError(
+          "unexpected exception");
+      assErr.initCause(e);
+      throw assErr;
+    }
+    return regionName;
+  }
+
+  public static String createClientMessagesRegionForTesting(GemFireCacheImpl cache,
+      String ePolicy, int capacity, int port, int expiryTime, String overFlowDir, boolean isDiskStore) {
+    AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache, 
+        ePolicy, capacity, overFlowDir, isDiskStore);
+    ExpirationAttributes ea = new ExpirationAttributes(expiryTime,
+        ExpirationAction.LOCAL_INVALIDATE);
+    factory.setEntryTimeToLive(ea);
+    RegionAttributes attr = factory.create();
+
+    return createClientMessagesRegion(attr, cache, capacity, port);
+  }
+
+  /**
+   * Generates the name for the client subscription using the given id.
+   * 
+   * @param id
+   * @return String
+   * @since 5.7 
+   */
+  public static String generateNameForClientMsgsRegion(int id) {
+    return ClientSubscriptionConfigImpl.CLIENT_SUBSCRIPTION + "_" + id;
+  }
+
+  /*
+   * Marker class name to identify the lock more easily in thread dumps private
+   * static class ClientMessagesRegionLock extends Object { }
+   */
+  public DistributionAdvisor getDistributionAdvisor() {
+    return this.advisor;
+  }
+  
+  /**
+   * Returns the BridgeServerAdvisor for this server
+   */
+  public CacheServerAdvisor getCacheServerAdvisor() {
+    return this.advisor;
+  }
+  
+  public Profile getProfile() {
+    return getDistributionAdvisor().createProfile();
+  }
+  
+  public DistributionAdvisee getParentAdvisee() {
+    return null;
+  }
+  
+  /**
+   * Returns the underlying <code>InternalDistributedSystem</code> connection.
+   * @return the underlying <code>InternalDistributedSystem</code>
+   */
+  public InternalDistributedSystem getSystem() {
+    return (InternalDistributedSystem)this.cache.getDistributedSystem();
+  }
+  
+  public String getName() {
+    return "CacheServer";
+  }
+  
+  public String getFullPath() {
+    return getName();
+  }
+
+  private final static AtomicInteger profileSN = new AtomicInteger();
+  
+  private static int createSerialNumber() {
+    return profileSN.incrementAndGet();
+  }
+
+  /**
+   * Returns an array of all the groups of this bridge server.
+   * This includes those from the groups gemfire property
+   * and those explicitly added to this server.
+   */
+  public String[] getCombinedGroups() {
+    ArrayList<String> groupList = new ArrayList<String>();
+    for (String g: MemberAttributes.parseGroups(null, getSystem().getConfig().getGroups())) {
+      if (!groupList.contains(g)) {
+        groupList.add(g);
+      }
+    }
+    for (String g: getGroups()) {
+      if (!groupList.contains(g)) {
+        groupList.add(g);
+      }
+    }
+    String[] groups = new String[groupList.size()];
+    return groupList.toArray(groups);
+  }
+  
+  public /*synchronized causes deadlock*/ void fillInProfile(Profile profile) {
+    assert profile instanceof CacheServerProfile;
+    CacheServerProfile bp = (CacheServerProfile)profile;
+    bp.setHost(getExternalAddress(false));
+    bp.setPort(getPort());
+    bp.setGroups(getCombinedGroups());
+    bp.setMaxConnections(maxConnections);
+    bp.setInitialLoad(loadMonitor.getLastLoad());
+    bp.setLoadPollInterval(getLoadPollInterval());
+    bp.serialNumber = getSerialNumber();
+    bp.finishInit();
+  }
+
+  public int getSerialNumber() {
+    return this.serialNumber;
+  }
+
+  
+   protected CacheClientNotifier getCacheClientNotifier() {
+    return getAcceptor().getCacheClientNotifier();
+  } 
+   
+  /**
+   * Registers a new <code>InterestRegistrationListener</code> with the set of
+   * <code>InterestRegistrationListener</code>s.
+   * 
+   * @param listener
+   *                The <code>InterestRegistrationListener</code> to register
+   * @throws IllegalStateException if the BridgeServer has not been started
+   * @since 5.8Beta
+   */
+  public void registerInterestRegistrationListener(
+      InterestRegistrationListener listener) {
+    if (!this.isRunning()) {
+      throw new IllegalStateException(LocalizedStrings.CacheServerImpl_MUST_BE_RUNNING.toLocalizedString());
+    }
+    getCacheClientNotifier().registerInterestRegistrationListener(listener); 
+  }
+
+  /**
+   * Unregisters an existing <code>InterestRegistrationListener</code> from
+   * the set of <code>InterestRegistrationListener</code>s.
+   * 
+   * @param listener
+   *                The <code>InterestRegistrationListener</code> to
+   *                unregister
+   * 
+   * @since 5.8Beta
+   */
+  public void unregisterInterestRegistrationListener(
+      InterestRegistrationListener listener) {
+    getCacheClientNotifier().unregisterInterestRegistrationListener(listener);     
+  }
+
+  /**
+   * Returns a read-only set of <code>InterestRegistrationListener</code>s
+   * registered with this notifier.
+   * 
+   * @return a read-only set of <code>InterestRegistrationListener</code>s
+   *         registered with this notifier
+   * 
+   * @since 5.8Beta
+   */
+  public Set getInterestRegistrationListeners() {
+    return getCacheClientNotifier().getInterestRegistrationListeners(); 
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientRegionEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientRegionEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientRegionEventImpl.java
new file mode 100755
index 0000000..176ddcb
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientRegionEventImpl.java
@@ -0,0 +1,108 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+
+//import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+
+/**
+ * Class <code>ClientRegionEventImpl</code> is a
+ * region event with the client's
+ * host and port for notification purposes.
+ * 
+ * @author Girish Thombare
+ * 
+ * @since 5.1
+ */
+public final class ClientRegionEventImpl extends RegionEventImpl
+  {
+
+  /**
+   * The originating membershipId of this event.
+   */
+  private  ClientProxyMembershipID context;
+
+  public ClientRegionEventImpl() {
+  }
+  
+  /**
+   * To be called from the Distributed Message without setting EventID
+   * @param region
+   * @param op
+   * @param callbackArgument
+   * @param originRemote
+   * @param distributedMember
+   */
+  public ClientRegionEventImpl(LocalRegion region, Operation op, Object callbackArgument,boolean originRemote, DistributedMember distributedMember,ClientProxyMembershipID contx) {
+    super(region, op,callbackArgument, originRemote,distributedMember);
+    setContext(contx);
+  }
+
+  public ClientRegionEventImpl(LocalRegion region, Operation op, Object callbackArgument,boolean originRemote, DistributedMember distributedMember,ClientProxyMembershipID contx,EventID eventId) {
+      super(region, op,callbackArgument, originRemote,distributedMember, eventId);
+      setContext(contx);
+  }
+
+
+  /**
+   * sets The membershipId originating this event
+   *  
+   */
+  protected void setContext(ClientProxyMembershipID contx)
+  {
+    this.context = contx;
+  }
+
+  /**
+   * Returns The context originating this event
+   * 
+   * @return The context originating this event
+   */
+  @Override
+  public ClientProxyMembershipID getContext()
+  {
+    return this.context;
+  }
+
+  @Override
+  public String toString()
+  {
+    String superStr = super.toString();
+    StringBuffer buffer = new StringBuffer();
+    String str = superStr.substring(0, superStr.length() - 1);
+    buffer.append(str).append(";context=").append(getContext()).append(']');
+    return buffer.toString();
+  }
+
+  @Override
+  public int getDSFID() {
+    return CLIENT_REGION_EVENT;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException
+  {
+    super.toData(out);
+    DataSerializer.writeObject(getContext(), out);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, ClassNotFoundException
+  {
+    super.fromData(in);
+    setContext(ClientProxyMembershipID.readCanonicalized(in));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserver.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserver.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserver.java
new file mode 100755
index 0000000..0646f04
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserver.java
@@ -0,0 +1,90 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * This interface is used by testing/debugging code to be notified of different
+ * client/server events.
+ * See the documentation for class ClientServerObserverHolder for details.
+ * 
+ * @author Yogesh Mahajan
+ * @since 5.1
+ *  
+ */
+public interface ClientServerObserver
+{
+  /**
+   * This callback is called when now primary Ep is identified.
+   */
+  public void afterPrimaryIdentificationFromBackup(ServerLocation location);
+
+  /**
+   * This callback is called just before interest registartion
+   */
+  public void beforeInterestRegistration();
+
+  /**
+   * This callback is called just after interest registartion
+   */
+  public void afterInterestRegistration();
+
+  /**
+   * This callback is called just before primary identification
+   */
+  public void beforePrimaryIdentificationFromBackup();
+
+  /**
+   * This callback is called just before Interest Recovery by DSM thread happens
+   */
+  public void beforeInterestRecovery();
+  
+  /**
+   * Invoked by CacheClientUpdater just before invoking endpointDied for
+   * fail over
+   * @param location ServerLocation which has failed
+   */
+  public void beforeFailoverByCacheClientUpdater(ServerLocation location);
+  /**
+   * Invoked before sending an instantiator message to server
+   * 
+   * @param eventId
+   */
+  public void beforeSendingToServer(EventID eventId);
+  /**
+   * Invoked after sending an instantiator message to server 
+   * 
+   * @param eventId
+   */
+  public void afterReceivingFromServer(EventID eventId);
+
+  /**
+   * This callback is called just before sending client ack to the primary servrer.
+   */
+   public void beforeSendingClientAck();  
+
+   /**
+    * Invoked after Message is created
+    *
+    * @param msg
+    */
+   public void afterMessageCreation(Message msg);
+   
+   /**
+    * Invoked after Queue Destroy Message has been sent
+    */
+   public void afterQueueDestroyMessage();
+   
+   /**
+    * Invoked after a primary is recovered from a backup or new connection. 
+    */
+   public void afterPrimaryRecovered(ServerLocation location);
+   
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverAdapter.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverAdapter.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverAdapter.java
new file mode 100755
index 0000000..094bb58
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverAdapter.java
@@ -0,0 +1,107 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * This class provides 'do-nothing' implementations of all of the methods of
+ * interface ClientServerObserver. See the documentation for class
+ * ClientServerObserverHolder for details.
+ * 
+ * @author Yogesh Mahajan
+ * @since 5.1
+ */
+public class ClientServerObserverAdapter implements ClientServerObserver
+{
+  /**
+   * This callback is called when now primary Ep is identified.
+   */
+  public void afterPrimaryIdentificationFromBackup(ServerLocation primaryEndpoint)
+  {
+  }
+
+  /**
+   * This callback is called just before interest registartion
+   */
+  public void beforeInterestRegistration()
+  {
+  }
+
+  /**
+   * This callback is called just after interest registartion
+   */
+  public void afterInterestRegistration()
+  {
+  }
+
+  /**
+   * This callback is called just before primary identification
+   */
+  public void beforePrimaryIdentificationFromBackup()
+  {
+  }
+
+  /**
+   * This callback is called just before Interest Recovery by DSM thread happens
+   */
+  public void beforeInterestRecovery()
+  {
+
+  }
+
+  public void beforeFailoverByCacheClientUpdater(ServerLocation epFailed)
+  {
+  }
+  /**
+   * Invoked before sending an instantiator message to server
+   * 
+   * @param eventId
+   */
+  public void beforeSendingToServer(EventID eventId){
+    
+  }
+  /**
+   * Invoked after sending an instantiator message to server 
+   * 
+   * @param eventId
+   */
+  public void afterReceivingFromServer(EventID eventId){
+    
+  }
+  
+  /**
+   * This callback is called just before sending client ack to the primary servrer.
+   */
+  public void beforeSendingClientAck(){
+    
+  }  
+
+  /**
+   * Invoked after Message is created
+   *
+   * @param msg
+   */
+  public void afterMessageCreation(Message msg){
+  
+  }
+  
+  /**
+   * Invoked after Queue Destroy Message has been sent
+   */
+  public void afterQueueDestroyMessage(){
+    
+  }
+  
+  /**
+   * Invoked after a primary is recovered from a backup or new connection. 
+   */
+  public void afterPrimaryRecovered(ServerLocation location) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverHolder.java
new file mode 100755
index 0000000..003852b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverHolder.java
@@ -0,0 +1,53 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.cache.query.internal.Support;
+
+/**
+ * This class is intended to hold a single 'observer' which will receive
+ * callbacks. There can be only one such observer at a time. If no observer is
+ * needed, this member variable should point to an object with 'do-nothing'
+ * methods, such as ClientServerObserverAdapter.
+ * 
+ * @author Yogesh Mahajan
+ * @since 5.1
+ */
+public class ClientServerObserverHolder
+  {
+
+  /**
+   * The default 'do-nothing' bridge observer *
+   */
+  private static final ClientServerObserver NO_OBSERVER = new ClientServerObserverAdapter();
+
+  /**
+   * The current observer which will be notified of all query events.
+   */
+  private static ClientServerObserver _instance = NO_OBSERVER;
+
+  /**
+   * Set the given observer to be notified of events. Returns the current
+   * observer.
+   */
+  public static final ClientServerObserver setInstance(ClientServerObserver observer)
+  {
+    Support.assertArg(observer != null,
+        "setInstance expects a non-null argument!");
+    ClientServerObserver oldObserver = _instance;
+    _instance = observer;
+    return oldObserver;
+  }
+
+  /** Return the current BridgeObserver instance */
+  public static final ClientServerObserver getInstance()
+  {
+    return _instance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java
index 921cbf9..198803d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java
@@ -105,9 +105,9 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
   protected CacheOperationMessage createMessage()
   {
     DestroyRegionMessage mssg;
-    if (this.event instanceof BridgeRegionEventImpl) {
+    if (this.event instanceof ClientRegionEventImpl) {
       mssg = new DestroyRegionWithContextMessage();
-      ((DestroyRegionWithContextMessage)mssg).context = ((BridgeRegionEventImpl)this.event)
+      ((DestroyRegionWithContextMessage)mssg).context = ((ClientRegionEventImpl)this.event)
           .getContext();
     }
     else {
@@ -502,7 +502,7 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
     @Override
     final public RegionEventImpl createRegionEvent(DistributedRegion rgn)
     {
-      BridgeRegionEventImpl event = new BridgeRegionEventImpl(rgn,
+      ClientRegionEventImpl event = new ClientRegionEventImpl(rgn,
           getOperation(), this.callbackArg, true /* originRemote */,
           getSender(), (ClientProxyMembershipID)this.context);
       return event;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
index 4dbc9c4..8056120 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
@@ -128,9 +128,9 @@ public class DistributedClearOperation extends DistributedCacheOperation
   protected CacheOperationMessage createMessage()
   {
     ClearRegionMessage mssg;
-    if (this.event instanceof BridgeRegionEventImpl) {
+    if (this.event instanceof ClientRegionEventImpl) {
       mssg = new ClearRegionWithContextMessage();
-      ((ClearRegionWithContextMessage)mssg).context = ((BridgeRegionEventImpl)this.event)
+      ((ClearRegionWithContextMessage)mssg).context = ((ClientRegionEventImpl)this.event)
           .getContext();
 
     }
@@ -271,7 +271,7 @@ public class DistributedClearOperation extends DistributedCacheOperation
     final public RegionEventImpl createRegionEvent(DistributedRegion rgn)
     {
       
-      BridgeRegionEventImpl event = new BridgeRegionEventImpl(rgn,
+      ClientRegionEventImpl event = new ClientRegionEventImpl(rgn,
           getOperation(), this.callbackArg, true /* originRemote */,
           getSender(), (ClientProxyMembershipID)this.context);
       return event;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 790dc4d..77fbc88 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -2893,9 +2893,6 @@ public class DistributedRegion extends LocalRegion implements
   protected void cacheWriterChanged(CacheWriter oldWriter)
   {
     super.cacheWriterChanged(oldWriter);
-    if (isBridgeWriter(oldWriter)) {
-      oldWriter = null;
-    }
     if (oldWriter == null ^ basicGetWriter() == null) {
       new UpdateAttributesProcessor(this).distribute();
     }
@@ -2905,9 +2902,6 @@ public class DistributedRegion extends LocalRegion implements
   protected void cacheLoaderChanged(CacheLoader oldLoader)
   {
     super.cacheLoaderChanged(oldLoader);
-    if (isBridgeLoader(oldLoader)) {
-      oldLoader = null;
-    }
     if (oldLoader == null ^ basicGetLoader() == null) {
       new UpdateAttributesProcessor(this).distribute();
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
index 5c428b2..357c0a8 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
@@ -28,7 +28,6 @@ import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.ExpirationAction;
 import com.gemstone.gemfire.cache.ExpirationAttributes;
 import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.util.BridgeWriterException;
 import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 import com.gemstone.gemfire.distributed.internal.PooledExecutorWithDMStats;
 import com.gemstone.gemfire.internal.SystemTimer;
@@ -364,23 +363,6 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
     } 
     catch (CancelException ex) {
       // ignore
-
-      // @todo grid: do we need to deal with pool exceptions here?
-     } catch (BridgeWriterException ex) {
-       // Some exceptions from the bridge writer should not be logged.
-       Throwable cause = ex.getCause();
-       // BridgeWriterExceptions from the server are wrapped in CacheWriterExceptions
-       if (cause != null && cause instanceof CacheWriterException)
-           cause = cause.getCause();
-       if (cause instanceof RegionDestroyedException ||
-           cause instanceof EntryNotFoundException ||
-           cause instanceof CancelException) {
-         if (logger.isDebugEnabled()) {
-           logger.debug("Exception in expiration task", ex);
-         }
-       } else {
-         logger.fatal(LocalizedMessage.create(LocalizedStrings.ExpiryTask_EXCEPTION_IN_EXPIRATION_TASK), ex);
-       }
     } 
      catch (VirtualMachineError err) {
        SystemFailure.initiateFailure(err);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java
index 1366f94..8d782a9 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java
@@ -76,11 +76,11 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
   private static void findLocalDurableQueues(ClientProxyMembershipID proxyId, ArrayList<ServerLocation> matches) {
     Cache c = GemFireCacheImpl.getInstance();
     if(c!=null) {
-      List l = c.getBridgeServers();
+      List l = c.getCacheServers();
       if(l!=null) {
         Iterator i = l.iterator();
         while(i.hasNext()) {
-          BridgeServerImpl bs = (BridgeServerImpl)i.next();
+          CacheServerImpl bs = (CacheServerImpl)i.next();
           if(bs.getAcceptor().getCacheClientNotifier().getClientProxy(proxyId)!=null) {
             ServerLocation loc = new ServerLocation(bs.getExternalAddress(),bs.getPort());
             matches.add(loc);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 79bcbc2..4bf0f42 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -136,7 +136,6 @@ import com.gemstone.gemfire.cache.query.internal.cq.CqService;
 import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
 import com.gemstone.gemfire.cache.server.CacheServer;
 import com.gemstone.gemfire.cache.snapshot.CacheSnapshotService;
-import com.gemstone.gemfire.cache.util.BridgeServer;
 import com.gemstone.gemfire.cache.util.GatewayConflictResolver;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
 import com.gemstone.gemfire.cache.wan.GatewayReceiver;
@@ -371,11 +370,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   private volatile DistributionAdvisee sqlfAdvisee;
 
   /**
-   * the list of all bridge servers. CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval
-   * operations. It is assumed that the traversal operations on bridge servers list vastly outnumber the mutative
+   * the list of all cache servers. CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval
+   * operations. It is assumed that the traversal operations on cache servers list vastly outnumber the mutative
    * operations such as add, remove.
    */
-  private volatile List allBridgeServers = new CopyOnWriteArrayList();
+  private volatile List allCacheServers = new CopyOnWriteArrayList();
 
   /**
    * Controls updates to the list of all gateway senders
@@ -664,7 +663,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     sb.append("; lockLease = " + this.lockLease);
     sb.append("; lockTimeout = " + this.lockTimeout);
     // sb.append("; rootRegions = (" + this.rootRegions + ")");
-    // sb.append("; bridgeServers = (" + this.bridgeServers + ")");
+    // sb.append("; cacheServers = (" + this.cacheServers + ")");
     // sb.append("; regionAttributes = (" + this.listRegionAttributes());
     // sb.append("; gatewayHub = " + gatewayHub);
     if (this.creationStack != null) {
@@ -1513,7 +1512,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
   }
 
   /**
-   * Close the distributed system, bridge servers, and gateways. Clears the rootRegions and partitionedRegions map.
+   * Close the distributed system, cache servers, and gateways. Clears the rootRegions and partitionedRegions map.
    * Marks the cache as closed.
    *
    * @see SystemFailure#emergencyClose()
@@ -1546,14 +1545,14 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     inst.disconnectCause = SystemFailure.getFailure();
     inst.isClosing = true;
 
-    // Clear bridge servers
+    // Clear cache servers
     if (DEBUG) {
-      System.err.println("DEBUG: Close bridge servers");
+      System.err.println("DEBUG: Close cache servers");
     }
     {
-      Iterator allBridgeServersItr = inst.allBridgeServers.iterator();
-      while (allBridgeServersItr.hasNext()) {
-        BridgeServerImpl bs = (BridgeServerImpl) allBridgeServersItr.next();
+      Iterator allCacheServersItr = inst.allCacheServers.iterator();
+      while (allCacheServersItr.hasNext()) {
+        CacheServerImpl bs = (CacheServerImpl) allCacheServersItr.next();
         AcceptorImpl ai = bs.getAcceptor();
         if (ai != null) {
           ai.emergencyClose();
@@ -1986,7 +1985,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
       TXStateProxy tx = null;
       try {
         this.keepAlive = keepalive;
-        PoolManagerImpl.setKeepAlive(keepalive);
 
         if (this.txMgr != null) {
           tx = this.txMgr.internalSuspend();
@@ -2044,7 +2042,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
           stopRedisServer();
 
           // no need to track PR instances since we won't create any more
-          // bridgeServers or gatewayHubs
+          // cacheServers or gatewayHubs
           if (this.partitionedRegions != null) {
             if (isDebugEnabled) {
               logger.debug("{}: clearing partitioned regions...", this);
@@ -2616,12 +2614,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     final boolean isDebugEnabled = logger.isDebugEnabled();
     
     if (isDebugEnabled) {
-      logger.debug("{}: stopping bridge servers...", this);
+      logger.debug("{}: stopping cache servers...", this);
     }
-    boolean stoppedBridgeServer = false;
-    Iterator allBridgeServersIterator = this.allBridgeServers.iterator();
-    while (allBridgeServersIterator.hasNext()) {
-      BridgeServerImpl bridge = (BridgeServerImpl) allBridgeServersIterator.next();
+    boolean stoppedCacheServer = false;
+    Iterator allCacheServersIterator = this.allCacheServers.iterator();
+    while (allCacheServersIterator.hasNext()) {
+      CacheServerImpl bridge = (CacheServerImpl) allCacheServersIterator.next();
       if (isDebugEnabled) {
         logger.debug("stopping bridge {}", bridge);
       }
@@ -2632,11 +2630,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
           logger.debug("Ignored cache closure while closing bridge {}", bridge, e);
         }
       }
-      allBridgeServers.remove(bridge);
-      stoppedBridgeServer = true;
+      allCacheServers.remove(bridge);
+      stoppedCacheServer = true;
     }
-    if (stoppedBridgeServer) {
-      // now that all the bridge servers have stopped empty the static pool of commBuffers it might have used.
+    if (stoppedCacheServer) {
+      // now that all the cache servers have stopped empty the static pool of commBuffers it might have used.
       ServerConnection.emptyCommBufferPool();
     }
     
@@ -3784,10 +3782,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     return this.eventThreadPool;
   }
 
-  public BridgeServer addBridgeServer() {
-    return (BridgeServer) addCacheServer();
-  }
-
   public CacheServer addCacheServer() {
     return addCacheServer(false);
   }
@@ -3798,8 +3792,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
     stopper.checkCancelInProgress(null);
 
-    BridgeServerImpl bridge = new BridgeServerImpl(this, isGatewayReceiver);
-    allBridgeServers.add(bridge);
+    CacheServerImpl bridge = new CacheServerImpl(this, isGatewayReceiver);
+    allCacheServers.add(bridge);
 
     sendAddCacheServerProfileMessage();
     return bridge;
@@ -3972,33 +3966,29 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     }
   }
 
-  public List getBridgeServers() {
-    return getCacheServers();
-  }
-
   public List getCacheServers() {
-    List bridgeServersWithoutReceiver = null;
-    if (!allBridgeServers.isEmpty()) {
-    Iterator allBridgeServersIterator = allBridgeServers.iterator();
-    while (allBridgeServersIterator.hasNext()) {
-      BridgeServerImpl bridgeServer = (BridgeServerImpl) allBridgeServersIterator.next();
-      // If BridgeServer is a GatewayReceiver, don't return as part of CacheServers
-      if (!bridgeServer.isGatewayReceiver()) {
-        if (bridgeServersWithoutReceiver == null) {
-          bridgeServersWithoutReceiver = new ArrayList();
+    List cacheServersWithoutReceiver = null;
+    if (!allCacheServers.isEmpty()) {
+    Iterator allCacheServersIterator = allCacheServers.iterator();
+    while (allCacheServersIterator.hasNext()) {
+      CacheServerImpl cacheServer = (CacheServerImpl) allCacheServersIterator.next();
+      // If CacheServer is a GatewayReceiver, don't return as part of CacheServers
+      if (!cacheServer.isGatewayReceiver()) {
+        if (cacheServersWithoutReceiver == null) {
+          cacheServersWithoutReceiver = new ArrayList();
         }
-        bridgeServersWithoutReceiver.add(bridgeServer);
+        cacheServersWithoutReceiver.add(cacheServer);
       }
     }
     }
-    if (bridgeServersWithoutReceiver == null) {
-      bridgeServersWithoutReceiver = Collections.emptyList();
+    if (cacheServersWithoutReceiver == null) {
+      cacheServersWithoutReceiver = Collections.emptyList();
     }
-    return bridgeServersWithoutReceiver;
+    return cacheServersWithoutReceiver;
   }
 
-  public List getBridgeServersAndGatewayReceiver() {
-    return allBridgeServers;
+  public List getCacheServersAndGatewayReceiver() {
+    return allCacheServers;
   }
 
   /**
@@ -4126,9 +4116,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
       boolean hasSerialSenders = hasSerialSenders(r);
       boolean result = hasSerialSenders;
       if (!result) {
-        Iterator allBridgeServersIterator = allBridgeServers.iterator();
-        while (allBridgeServersIterator.hasNext()) {
-          BridgeServerImpl server = (BridgeServerImpl) allBridgeServersIterator.next();
+        Iterator allCacheServersIterator = allCacheServers.iterator();
+        while (allCacheServersIterator.hasNext()) {
+          CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next();
           if (!server.getNotifyBySubscription()) {
             result = true;
             break;
@@ -4182,7 +4172,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
     stopper.checkCancelInProgress(null);
 
     if (!this.isServer) {
-      return (this.allBridgeServers.size() > 0);
+      return (this.allCacheServers.size() > 0);
     } else {
       return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java
index acafd6d..13d6068 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java
@@ -45,7 +45,7 @@ public abstract class GridAdvisor extends DistributionAdvisor {
 
   private static final Filter BRIDGE_SERVER_FILTER = new Filter() {
       public boolean include(Profile profile) {
-        return profile instanceof BridgeServerAdvisor.BridgeServerProfile;
+        return profile instanceof CacheServerAdvisor.CacheServerProfile;
       }
     };
   
@@ -327,9 +327,9 @@ public abstract class GridAdvisor extends DistributionAdvisor {
         boolean exchangeProfiles, final List<Profile> replyProfiles) {
       final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
       if (cache != null && !cache.isClosed()) {
-        List<?> bridgeServers = cache.getBridgeServersAndGatewayReceiver();
+        List<?> bridgeServers = cache.getCacheServersAndGatewayReceiver();
         for (int i = 0; i < bridgeServers.size(); i++) {
-          BridgeServerImpl bsi = (BridgeServerImpl)bridgeServers.get(i);
+          CacheServerImpl bsi = (CacheServerImpl)bridgeServers.get(i);
           if (bsi.isRunning()) {
             if(bsi.getProfile().equals(this)) {
               continue;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 7c1ec89..9e5bcd2 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -100,7 +100,6 @@ import com.gemstone.gemfire.cache.TransactionId;
 import com.gemstone.gemfire.cache.client.PoolManager;
 import com.gemstone.gemfire.cache.client.ServerOperationException;
 import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
-import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl;
 import com.gemstone.gemfire.cache.client.internal.Connection;
 import com.gemstone.gemfire.cache.client.internal.Endpoint;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
@@ -134,7 +133,6 @@ import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
 import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
 import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
 import com.gemstone.gemfire.cache.query.internal.index.IndexUtils;
-import com.gemstone.gemfire.cache.util.BridgeWriterException;
 import com.gemstone.gemfire.cache.util.ObjectSizer;
 import com.gemstone.gemfire.cache.wan.GatewaySender;
 import com.gemstone.gemfire.distributed.DistributedMember;
@@ -674,9 +672,7 @@ public class LocalRegion extends AbstractRegion
     }
     
     // initialize client to server proxy
-    this.srp = ((this.getPoolName() != null)
-                || isBridgeLoader(this.getCacheLoader())
-                || isBridgeWriter(this.getCacheWriter()))
+    this.srp = (this.getPoolName() != null)
       ? new ServerRegionProxy(this)
       : null;
     this.imageState =
@@ -3986,22 +3982,6 @@ public class LocalRegion extends AbstractRegion
     reinitialize(inputStream, event);
   }
 
-//   public void createRegionOnServer() throws CacheWriterException
-//   {
-//     if (basicGetWriter() instanceof BridgeWriter) {
-//       if (getParentRegion() != null) {
-//         BridgeWriter bw = (BridgeWriter)basicGetWriter();
-//         bw.createRegionOnServer(getParentRegion().getFullPath(), getName());
-//       }
-//       else {
-//        throw new CacheWriterException(LocalizedStrings.LocalRegion_REGION_0_IS_A_ROOT_REGION_ONLY_NONROOT_REGIONS_CAN_BE_CREATED_ON_THE_SERVER.toLocalizedString(getFullPath()));
-//       }
-//     }
-//     else {
-//      throw new CacheWriterException(LocalizedStrings.LocalRegion_SERVER_REGION_CREATION_IS_ONLY_SUPPORTED_ON_CLIENT_SERVER_TOPOLOGIES_THE_CURRENT_CACHEWRITER_IS_0.toLocalizedString(this.cacheWriter));
-//     }
-//   }
-
   public void registerInterest(Object key)
   {
     registerInterest(key, false);
@@ -4068,13 +4048,8 @@ public class LocalRegion extends AbstractRegion
       throw new IllegalStateException(LocalizedStrings.LocalRegion_DURABLE_FLAG_ONLY_APPLICABLE_FOR_DURABLE_CLIENTS.toLocalizedString());
     }
     if (!proxy.getPool().getSubscriptionEnabled()) {
-      if (proxy.getPool() instanceof BridgePoolImpl) {
-        String msg = "Interest registration requires establishCallbackConnection to be set to true.";
-        throw new BridgeWriterException(msg);
-      } else {
-        String msg = "Interest registration requires a pool whose queue is enabled.";
-        throw new SubscriptionNotEnabledException(msg);
-      }
+      String msg = "Interest registration requires a pool whose queue is enabled.";
+      throw new SubscriptionNotEnabledException(msg);
     }
 
     if (getAttributes().getDataPolicy().withReplication() // fix for bug 36185
@@ -4101,7 +4076,7 @@ public class LocalRegion extends AbstractRegion
       this.clearKeysOfInterest(key, interestType, pol);
       // Checking for the Dunit test(testRegisterInterst_Destroy_Concurrent) flag
       if (PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG) {
-        BridgeObserver bo = BridgeObserverHolder.getInstance();
+        ClientServerObserver bo = ClientServerObserverHolder.getInstance();
         bo.beforeInterestRegistration();
       }// Test Code Ends
       final byte regionDataPolicy = getAttributes().getDataPolicy().ordinal;
@@ -9786,7 +9761,7 @@ public class LocalRegion extends AbstractRegion
       }
     }
 
-    RegionEventImpl event = new BridgeRegionEventImpl(this, Operation.REGION_DESTROY,
+    RegionEventImpl event = new ClientRegionEventImpl(this, Operation.REGION_DESTROY,
          callbackArg,false, client.getDistributedMember(), client/* context */, eventId);
 
     basicDestroyRegion(event, true);
@@ -9811,7 +9786,7 @@ public class LocalRegion extends AbstractRegion
       }
     }
 
-    RegionEventImpl event = new BridgeRegionEventImpl(this, Operation.REGION_CLEAR,
+    RegionEventImpl event = new ClientRegionEventImpl(this, Operation.REGION_CLEAR,
          callbackArg,false, client.getDistributedMember(), client/* context */, eventId);
 
     basicClear(event, true);
@@ -11310,7 +11285,7 @@ public class LocalRegion extends AbstractRegion
    */
   protected boolean shouldNotifyBridgeClients()
   {
-    return (this.cache.getBridgeServers().size() > 0)
+    return (this.cache.getCacheServers().size() > 0)
         && !this.isUsedForPartitionedRegionAdmin
         && !this.isUsedForPartitionedRegionBucket
         && !this.isUsedForMetaRegion;
@@ -11444,7 +11419,7 @@ public class LocalRegion extends AbstractRegion
       predicate = predicate.trim();
 
       // Compare the query patterns to the 'predicate'. If one matches,
-      // send it as is to the BridgeLoader
+      // send it as is to the server
       boolean matches = false;
       for (int i=0; i<QUERY_PATTERNS.length; i++) {
         if (QUERY_PATTERNS[i].matcher(predicate).matches()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index c007891..629c5a4 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -5457,9 +5457,6 @@ public class PartitionedRegion extends LocalRegion implements
   protected void cacheWriterChanged(CacheWriter p_oldWriter) {
     CacheWriter oldWriter = p_oldWriter;
     super.cacheWriterChanged(oldWriter);
-    if (isBridgeWriter(oldWriter)) {
-      oldWriter = null;
-    }
     if (oldWriter == null ^ basicGetWriter() == null) {
       new UpdateAttributesProcessor(this).distribute();
     }
@@ -5469,9 +5466,6 @@ public class PartitionedRegion extends LocalRegion implements
   @Override
   protected void cacheLoaderChanged(CacheLoader oldLoader) {
     CacheLoader myOldLoader = oldLoader;
-    if (isBridgeLoader(oldLoader)) {
-      myOldLoader = null;
-    }
     this.dataStore.cacheLoaderChanged(basicGetLoader(), myOldLoader);
     super.cacheLoaderChanged(oldLoader);
     if (myOldLoader == null ^ basicGetLoader() == null) {
@@ -5902,7 +5896,7 @@ public class PartitionedRegion extends LocalRegion implements
     Collections.addAll(localServerGroups, MemberAttributes.parseGroups(null, c.getSystem().getConfig().getGroups()));
     
     for (Object object : servers) {
-      BridgeServerImpl server = (BridgeServerImpl)object;
+      CacheServerImpl server = (CacheServerImpl)object;
       if (server.isRunning() && (server.getExternalAddress() != null)) {
         Collections.addAll(localServerGroups, server.getGroups());
       }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java
index 4569184..0838d29 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java
@@ -27,8 +27,6 @@ import com.gemstone.gemfire.cache.CacheException;
 import com.gemstone.gemfire.cache.RegionService;
 import com.gemstone.gemfire.cache.client.Pool;
 import com.gemstone.gemfire.cache.client.PoolFactory;
-import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl;
-import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl.LBPolicy;
 import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallback;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.query.QueryService;
@@ -286,141 +284,14 @@ public class PoolFactoryImpl implements PoolFactory {
     this.attributes.servers.addAll(cp.getServers());
   }
 
-  public void init(Properties properties, boolean usedByBridgeWriter,
-      boolean usedByGateway, GatewaySender sender) {
+  public void init(GatewaySender sender) {
+    this.attributes.setGateway(true);
     this.attributes.setGatewaySender(sender);
-    init(properties, usedByBridgeWriter, usedByGateway);
-  }
-  /**
-   * Used to create a pool given the old Bridge properties
-   * @param properties from a BridgeWriter or BridgeLoader
-   * @param usedByBridgeWriter true  if props from BridgeWriter;
-   *                           false if props from BridgeLoader.
-   * *param usedByGateway true if props from GatewayImpl.
-   * @since 5.7
-   */
-  public void init(Properties properties, boolean usedByBridgeWriter,
-                   boolean usedByGateway) {
-    this.attributes.setBridge(usedByBridgeWriter || !usedByGateway);
-    this.attributes.setBridgeWriter(usedByBridgeWriter);
-    this.attributes.setGateway(usedByGateway);
     setIdleTimeout(-1); // never time out
     setLoadConditioningInterval(-1); // never time out
     setMaxConnections(-1);
-    int endpointCount = 0;
-    boolean useLocators = false;
-    boolean useEndPoints = false;
-    IllegalArgumentException exception = null;
-    if (properties.containsKey(DistributionConfig.LOCATORS_NAME)) {
-      String locatorObject = properties
-          .getProperty(DistributionConfig.LOCATORS_NAME);
-      if (locatorObject != null && !locatorObject.equals("")) {
-        StringTokenizer locatorsOnThisVM = new StringTokenizer(locatorObject, ",");
-        while (locatorsOnThisVM.hasMoreTokens()) {
-          String localLocator = locatorsOnThisVM.nextToken();
-          DistributionLocatorId locatorId = new DistributionLocatorId(
-              localLocator);
-          addLocator(locatorId.getHost().getHostName(), locatorId.getPort());
-        }
-        useLocators = true;
-      }
-    }
-    if (!useLocators && properties.containsKey("endpoints")) {
-      useEndPoints = true;
-      String pv = properties.getProperty("endpoints");
-      StringTokenizer tokenizer = new StringTokenizer(pv, ",");
-      while (tokenizer.hasMoreTokens()) {
-        String serverdetail = tokenizer.nextToken();
-        int cIndex = serverdetail.indexOf("=");
-        // note we throw the name away
-//         String name = serverdetail.substring(0, cIndex);
-//         if (name != null) {
-//           name = name.trim();
-//         }
-        String remainder = serverdetail.substring(cIndex + 1);
-        cIndex = remainder.lastIndexOf(":");
-        String host = remainder.substring(0, cIndex);
-        if (host != null) {
-          host = host.trim();
-        }
-        String port = remainder.substring(cIndex + 1);
-        if (port != null) {
-          port = port.trim();
-        }
-        try {
-          addServer(host, Integer.parseInt(port));
-          endpointCount++;
-        } catch (IllegalArgumentException e) {
-          if (!(e.getCause() instanceof UnknownHostException)) {
-            throw e;
-          } else {
-            exception = e;
-          }
-        }
-      }
-      if ((endpointCount == 0) && (exception != null)) {
-        IllegalArgumentException ex = new IllegalArgumentException("Couldn't find any Endpoint. " + exception.getMessage());
-        ex.initCause(exception.getCause());
-        throw ex;
-      }
-    }
-    if(!useLocators && !useEndPoints) {
-      throw new IllegalArgumentException(
-          "Property 'locators ' or 'endpoints' must be specified");
-    }
-    // @todo grid: handshakeTimeout ignored
-    {
-      // @todo grid: roundRobin and appAssisted ignored
-      LBPolicy policy = new LBPolicy(properties.getProperty("LBPolicy",
-          LBPolicy.STICKY_PROPERTY_NAME));
-      setThreadLocalConnections(policy.isSticky());
-    }
-    
-    if (properties.containsKey("retryAttempts")) {
-      String strRetryAttempts = properties.getProperty("retryAttempts");
-      setRetryAttempts(Integer.parseInt(strRetryAttempts));
-    }
-    if (properties.containsKey("retryInterval")) {
-      String strRetryAttempts = properties.getProperty("retryInterval");
-      setPingInterval(Integer.parseInt(strRetryAttempts));
-    }
-    if (properties.containsKey("establishCallbackConnection")) {
-      String str = properties.getProperty("establishCallbackConnection");
-      setSubscriptionEnabled(Boolean.valueOf(str).booleanValue());
-    }
-    if (properties.containsKey("enablePRSingleHop")) {
-      String str = properties.getProperty("enablePRSingleHop");
-      setPRSingleHopEnabled(Boolean.valueOf(str).booleanValue());
-    }
-    if (properties.containsKey("connectionsPerServer")) {
-      String str = properties.getProperty("connectionsPerServer");
-      setMinConnections(Integer.parseInt(str)*endpointCount);
-    } else {
-      setMinConnections(1*endpointCount);
-    }
-    if (properties.containsKey("redundancyLevel")) {
-      String str = properties.getProperty("redundancyLevel");
-      setSubscriptionRedundancy(Integer.parseInt(str));
-    }
-    if (properties.containsKey("readTimeout")) {
-      String strReadTimeout = properties.getProperty("readTimeout");
-      setReadTimeout(Integer.parseInt(strReadTimeout));
-    }
-    if (properties.containsKey("socketBufferSize")) {
-      String strSocketBufferSize = properties.getProperty("socketBufferSize");
-      setSocketBufferSize(Integer.parseInt(strSocketBufferSize));
-    }
-    if (properties.containsKey("messageTrackingTimeout")) {
-      String pv = properties.getProperty("messageTrackingTimeout");
-      setSubscriptionMessageTrackingTimeout(Integer.parseInt(pv));
-    }
-    if(properties.containsKey("clientAckInterval") ) {
-      String pv = properties.getProperty("clientAckInterval");
-      setSubscriptionAckInterval(Integer.parseInt(pv));
-    }
-    if(usedByGateway && exception!= null) {
-      throw exception;
-    }
+    setMinConnections(0);
+    setThreadLocalConnections(true);
   }
   
   /**
@@ -441,11 +312,7 @@ public class PoolFactoryImpl implements PoolFactory {
         registry.creatingPool();
       }
     }
-    if (this.attributes.isBridge()) {
-      return new BridgePoolImpl(this.pm, name, this.attributes);
-    } else {
-      return PoolImpl.create(this.pm, name, this.attributes);
-    }
+    return PoolImpl.create(this.pm, name, this.attributes);
   }
 
   /**
@@ -487,19 +354,7 @@ public class PoolFactoryImpl implements PoolFactory {
     public transient LocatorDiscoveryCallback locatorCallback = null; //only used by tests
     public GatewaySender gatewaySender = null;
     /**
-     * True if this factory needs to produce a pool for use by BridgeWriter
-     * or BridgeLoader.
-     */
-    public boolean bridge = false;
-    /**
-     * True if bridge is true and the pool is used by a BridgeWriter.
-     * False if bridge is true and the pool is used by a BridgeLoader.
-     * Ignore this attribute if bridge is false.
-     */
-    public boolean bridgeWriter = false;
-    /**
-     * True if bridge is true and the pool is used by a Gateway.
-     * Ignore this attribute if bridge is false.
+     * True if the pool is used by a Gateway.
      */
     public boolean gateway = false;
 
@@ -554,18 +409,6 @@ public class PoolFactoryImpl implements PoolFactory {
     public String getServerGroup() {
       return this.serverGroup;
     }
-    public boolean isBridge() {
-      return this.bridge;
-    }
-    public void setBridge(boolean v) {
-      this.bridge = v;
-    }
-    public boolean isBridgeWriter() {
-      return this.bridgeWriter;
-    }
-    public void setBridgeWriter(boolean v) {
-      this.bridgeWriter = v;
-    }
     public boolean isGateway() {
       return this.gateway;
     }
@@ -648,7 +491,6 @@ public class PoolFactoryImpl implements PoolFactory {
       DataSerializer.writeString(this.serverGroup, out);
       DataSerializer.writeArrayList(this.locators, out);
       DataSerializer.writeArrayList(this.servers, out);
-      DataSerializer.writePrimitiveBoolean(this.bridge, out);
       DataSerializer.writePrimitiveInt(this.statisticInterval, out);
       DataSerializer.writePrimitiveBoolean(this.multiuserSecureModeEnabled,out);
     }
@@ -671,7 +513,6 @@ public class PoolFactoryImpl implements PoolFactory {
       this.serverGroup = DataSerializer.readString(in);
       this.locators = DataSerializer.readArrayList(in);
       this.servers = DataSerializer.readArrayList(in);
-      this.bridge = DataSerializer.readPrimitiveBoolean(in);
       this.statisticInterval= DataSerializer.readPrimitiveInt(in);
       this.multiuserSecureModeEnabled = DataSerializer.readPrimitiveBoolean(in);
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
index 289a3f5..25ba55d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
@@ -22,7 +22,6 @@ import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.client.Pool;
 import com.gemstone.gemfire.cache.client.PoolFactory;
 import com.gemstone.gemfire.cache.client.PoolManager;
-import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl;
 import com.gemstone.gemfire.cache.client.internal.PoolImpl;
 import com.gemstone.gemfire.cache.client.internal.RegisterDataSerializersOp;
 import com.gemstone.gemfire.cache.client.internal.RegisterInstantiatorsOp;
@@ -102,24 +101,6 @@ public class PoolManagerImpl {
   }
   
   /**
-   * Set the keep alive flag before closing. Only for use with the deprecated
-   * BridgeWriter/Loader code. A BridgeWriter is automatically
-   * closed then the last region is disconnected from it,
-   * so we need to mark the connections as keep alive
-   * before we close the regions that use the bridge writer/loader
-   * 
-   * @param keepAlive
-   */
-  public static void setKeepAlive(boolean keepAlive) {
-    for(Iterator<Pool> itr = PoolManager.getAll().values().iterator(); itr.hasNext(); ) {
-      Pool nextPool = itr.next();
-      if(nextPool instanceof BridgePoolImpl) {
-        BridgePoolImpl bridgePool = (BridgePoolImpl) nextPool;
-        bridgePool.setKeepAlive(keepAlive);
-      }
-    }
-  }
-  /**
    * Destroys all created pool in this manager.
    */
   public void close(boolean keepAlive) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java
index 7d96986..08a6bad 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java
@@ -282,7 +282,7 @@ public class RegionEventImpl
   }
   
   public ClientProxyMembershipID getContext() {
-    // regular region events do not have bridge context - see BridgeRegionEventImpl
+    // regular region events do not have a context - see ClientRegionEventImpl
     return null;
   }