You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by vf...@apache.org on 2015/11/25 20:07:54 UTC
[41/50] [abbrv] incubator-geode git commit: GEODE-243: remove
deprecated Bridge feature
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java
new file mode 100644
index 0000000..646f336
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java
@@ -0,0 +1,164 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.server.ServerLoad;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.InternalDataSerializer;
+
+
+/**
+ * Used to give advice to a cache server.
+ * Cache server currently need to know about controller's
+ * @author darrel
+ *
+ */
+public class CacheServerAdvisor extends GridAdvisor {
+
+ private CacheServerAdvisor(DistributionAdvisee server) {
+ super(server);
+ }
+
+ public static CacheServerAdvisor createCacheServerAdvisor(DistributionAdvisee server) {
+ CacheServerAdvisor advisor = new CacheServerAdvisor(server);
+ advisor.initialize();
+ return advisor;
+ }
+
+ @Override
+ public String toString() {
+ return "CacheServerAdvisor for " + getAdvisee().getFullPath();
+ }
+
+ /** Instantiate new distribution profile for this member */
+ @Override
+ protected Profile instantiateProfile(
+ InternalDistributedMember memberId, int version) {
+ return new CacheServerProfile(memberId, version);
+ }
+
+ /**
+ * Describes a cache server for distribution purposes.
+ */
+ public static class CacheServerProfile extends GridAdvisor.GridProfile {
+ private String[] groups;
+ private int maxConnections;
+ private ServerLoad initialLoad;
+ private long loadPollInterval;
+
+ /** for internal use, required for DataSerializer.readObject */
+ public CacheServerProfile() {
+ }
+
+ public CacheServerProfile(InternalDistributedMember memberId, int version) {
+ super(memberId, version);
+ }
+
+ public CacheServerProfile(CacheServerProfile toCopy) {
+ super(toCopy);
+ this.groups = toCopy.groups;
+ }
+
+ /** don't modify the returned array! */
+ public String[] getGroups() {
+ return this.groups;
+ }
+ public void setGroups(String[] groups) {
+ this.groups = groups;
+ }
+
+ public ServerLoad getInitialLoad() {
+ return initialLoad;
+ }
+
+ public int getMaxConnections() {
+ return maxConnections;
+ }
+
+ public void setMaxConnections(int maxConnections) {
+ this.maxConnections = maxConnections;
+ }
+
+ public void setInitialLoad(ServerLoad initialLoad) {
+ this.initialLoad = initialLoad;
+ }
+ public long getLoadPollInterval() {
+ return this.loadPollInterval;
+ }
+ public void setLoadPollInterval(long v) {
+ this.loadPollInterval = v;
+ }
+
+ /**
+ * Used to process an incoming cache server profile. Any controller in this
+ * vm needs to be told about this incoming new cache server. The reply
+ * needs to contain any controller(s) that exist in this vm.
+ *
+ * @since 5.7
+ */
+ @Override
+ public void processIncoming(DistributionManager dm, String adviseePath,
+ boolean removeProfile, boolean exchangeProfiles,
+ final List<Profile> replyProfiles) {
+ // tell local controllers about this cache server
+ tellLocalControllers(removeProfile, exchangeProfiles, replyProfiles);
+ // for QRM messaging we need cache servers to know about each other
+ tellLocalBridgeServers(removeProfile, exchangeProfiles, replyProfiles);
+ }
+
+ @Override
+ public int getDSFID() {
+ return CACHE_SERVER_PROFILE;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ super.toData(out);
+ DataSerializer.writeStringArray(this.groups, out);
+ out.writeInt(maxConnections);
+ InternalDataSerializer.invokeToData(initialLoad, out);
+ out.writeLong(getLoadPollInterval());
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ super.fromData(in);
+ this.groups = DataSerializer.readStringArray(in);
+ this.maxConnections = in.readInt();
+ this.initialLoad = new ServerLoad();
+ InternalDataSerializer.invokeFromData(initialLoad, in);
+ setLoadPollInterval(in.readLong());
+ }
+
+ @Override
+ public StringBuilder getToStringHeader() {
+ return new StringBuilder("BridgeServerProfile");
+ }
+
+ @Override
+ public void fillInToString(StringBuilder sb) {
+ super.fillInToString(sb);
+ if (this.groups != null) {
+ sb.append("; groups=" + Arrays.asList(this.groups));
+ sb.append("; maxConnections=" + maxConnections);
+ sb.append("; initialLoad=" + initialLoad);
+ sb.append("; loadPollInterval=" + getLoadPollInterval());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
new file mode 100644
index 0000000..422711e
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java
@@ -0,0 +1,812 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.CancelCriterion;
+import com.gemstone.gemfire.GemFireIOException;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.InvalidValueException;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.ClientSession;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.DynamicRegionFactory;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.ExpirationAction;
+import com.gemstone.gemfire.cache.ExpirationAttributes;
+import com.gemstone.gemfire.cache.InterestRegistrationListener;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig;
+import com.gemstone.gemfire.cache.server.ServerLoadProbe;
+import com.gemstone.gemfire.cache.server.internal.LoadMonitor;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
+import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.ResourceEvent;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.OSProcess;
+import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion;
+import com.gemstone.gemfire.internal.cache.CacheServerAdvisor.CacheServerProfile;
+import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
+import com.gemstone.gemfire.internal.cache.tier.Acceptor;
+import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl;
+import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.management.membership.ClientMembership;
+import com.gemstone.gemfire.management.membership.ClientMembershipListener;
+
+/**
+ * An implementation of the <code>CacheServer</code> interface that delegates
+ * most of the heavy lifting to an {@link Acceptor}.
+ *
+ * @author David Whitlock
+ * @since 4.0
+ */
+@SuppressWarnings("deprecation")
+public class CacheServerImpl
+ extends AbstractCacheServer
+ implements DistributionAdvisee {
+
+ private static final Logger logger = LogService.getLogger();
+
+ private static final int FORCE_LOAD_UPDATE_FREQUENCY= Integer.getInteger("gemfire.BridgeServer.FORCE_LOAD_UPDATE_FREQUENCY", 10).intValue();
+
+ /** The acceptor that does the actual serving */
+ private volatile AcceptorImpl acceptor;
+
+ /**
+ * The advisor used by this cache server.
+ * @since 5.7
+ */
+ private volatile CacheServerAdvisor advisor;
+
+ /**
+ * The monitor used to monitor load on this
+ * bridge server and distribute load to the locators
+ * @since 5.7
+ */
+ private volatile LoadMonitor loadMonitor;
+
+ /**
+ * boolean that represents whether this server is a GatewayReceiver or a simple BridgeServer
+ */
+ private boolean isGatewayReceiver;
+
+ private List<GatewayTransportFilter> gatewayTransportFilters = Collections.EMPTY_LIST;
+
+ /**
+ * Needed because this guy is an advisee
+ * @since 5.7
+ */
+ private int serialNumber; // changed on each start
+
+ public static final boolean ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE =
+ Boolean.getBoolean("gemfire.cache-server.enable-notify-by-subscription-false");
+
+
+ // ////////////////////// Constructors //////////////////////
+
+ /**
+ * Creates a new <code>BridgeServerImpl</code> that serves the contents of
+ * the give <code>Cache</code>. It has the default configuration.
+ */
+ public CacheServerImpl(GemFireCacheImpl cache, boolean isGatewayReceiver) {
+ super(cache);
+ this.isGatewayReceiver = isGatewayReceiver;
+ }
+
+ // //////////////////// Instance Methods ///////////////////
+
+ public CancelCriterion getCancelCriterion() {
+ return cache.getCancelCriterion();
+ }
+
+ /**
+ * Checks to see whether or not this bridge server is running. If so, an
+ * {@link IllegalStateException} is thrown.
+ */
+ private void checkRunning() {
+ if (this.isRunning()) {
+ throw new IllegalStateException(LocalizedStrings.CacheServerImpl_A_CACHE_SERVERS_CONFIGURATION_CANNOT_BE_CHANGED_ONCE_IT_IS_RUNNING.toLocalizedString());
+ }
+ }
+
+ public boolean isGatewayReceiver() {
+ return this.isGatewayReceiver;
+ }
+
+ @Override
+ public int getPort() {
+ if (this.acceptor != null) {
+ return this.acceptor.getPort();
+ }
+ else {
+ return super.getPort();
+ }
+ }
+
+ @Override
+ public void setPort(int port) {
+ checkRunning();
+ super.setPort(port);
+ }
+
+ @Override
+ public void setBindAddress(String address) {
+ checkRunning();
+ super.setBindAddress(address);
+ }
+ @Override
+ public void setHostnameForClients(String name) {
+ checkRunning();
+ super.setHostnameForClients(name);
+ }
+
+ @Override
+ public void setMaxConnections(int maxCon) {
+ checkRunning();
+ super.setMaxConnections(maxCon);
+ }
+
+ @Override
+ public void setMaxThreads(int maxThreads) {
+ checkRunning();
+ super.setMaxThreads(maxThreads);
+ }
+
+ @Override
+ public void setNotifyBySubscription(boolean b) {
+ checkRunning();
+ if (CacheServerImpl.ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE) {
+ this.notifyBySubscription = b;
+ }
+ }
+
+ @Override
+ public void setMaximumMessageCount(int maximumMessageCount) {
+ checkRunning();
+ super.setMaximumMessageCount(maximumMessageCount);
+ }
+
+ @Override
+ public void setSocketBufferSize(int socketBufferSize) {
+ this.socketBufferSize = socketBufferSize;
+ }
+
+ @Override
+ public int getSocketBufferSize() {
+ return this.socketBufferSize;
+ }
+
+ @Override
+ public void setMaximumTimeBetweenPings(int maximumTimeBetweenPings) {
+ this.maximumTimeBetweenPings = maximumTimeBetweenPings;
+ }
+
+ @Override
+ public int getMaximumTimeBetweenPings() {
+ return this.maximumTimeBetweenPings;
+ }
+
+
+ @Override
+ public void setLoadPollInterval(long loadPollInterval) {
+ checkRunning();
+ super.setLoadPollInterval(loadPollInterval);
+ }
+
+ @Override
+ public int getMaximumMessageCount() {
+ return this.maximumMessageCount;
+ }
+
+ @Override
+ public void setLoadProbe(ServerLoadProbe loadProbe) {
+ checkRunning();
+ super.setLoadProbe(loadProbe);
+ }
+
+ public void setGatewayTransportFilter(
+ List<GatewayTransportFilter> transportFilters) {
+ this.gatewayTransportFilters = transportFilters;
+ }
+
+ @Override
+ public int getMessageTimeToLive() {
+ return this.messageTimeToLive;
+ }
+
+
+ public ClientSubscriptionConfig getClientSubscriptionConfig(){
+ return this.clientSubscriptionConfig;
+ }
+
+ /**
+ * Sets the configuration of <b>this</b> <code>CacheServer</code> based on
+ * the configuration of <b>another</b> <code>CacheServer</code>.
+ */
+ public void configureFrom(CacheServer other) {
+ setPort(other.getPort());
+ setBindAddress(other.getBindAddress());
+ setHostnameForClients(other.getHostnameForClients());
+ setMaxConnections(other.getMaxConnections());
+ setMaxThreads(other.getMaxThreads());
+ setNotifyBySubscription(other.getNotifyBySubscription());
+ setSocketBufferSize(other.getSocketBufferSize());
+ setTcpNoDelay(other.getTcpNoDelay());
+ setMaximumTimeBetweenPings(other.getMaximumTimeBetweenPings());
+ setMaximumMessageCount(other.getMaximumMessageCount());
+ setMessageTimeToLive(other.getMessageTimeToLive());
+// setTransactionTimeToLive(other.getTransactionTimeToLive()); not implemented in CacheServer for v6.6
+ setGroups(other.getGroups());
+ setLoadProbe(other.getLoadProbe());
+ setLoadPollInterval(other.getLoadPollInterval());
+ ClientSubscriptionConfig cscOther = other.getClientSubscriptionConfig();
+ ClientSubscriptionConfig cscThis = this.getClientSubscriptionConfig();
+ // added for configuration of ha overflow
+ cscThis.setEvictionPolicy(cscOther.getEvictionPolicy());
+ cscThis.setCapacity(cscOther.getCapacity());
+ String diskStoreName = cscOther.getDiskStoreName();
+ if (diskStoreName != null) {
+ cscThis.setDiskStoreName(diskStoreName);
+ } else {
+ cscThis.setOverflowDirectory(cscOther.getOverflowDirectory());
+ }
+ }
+
+ @Override
+ public synchronized void start() throws IOException {
+ Assert.assertTrue(this.cache != null);
+ boolean isSqlFabricSystem = ((GemFireCacheImpl)this.cache).isSqlfSystem();
+
+ this.serialNumber = createSerialNumber();
+ if (DynamicRegionFactory.get().isOpen()) {
+ // force notifyBySubscription to be true so that meta info is pushed
+ // from servers to clients instead of invalidates.
+ if (!this.notifyBySubscription) {
+ logger.info(LocalizedMessage.create(LocalizedStrings.CacheServerImpl_FORCING_NOTIFYBYSUBSCRIPTION_TO_SUPPORT_DYNAMIC_REGIONS));
+ this.notifyBySubscription = true;
+ }
+ }
+ this.advisor = CacheServerAdvisor.createCacheServerAdvisor(this);
+ this.loadMonitor = new LoadMonitor(loadProbe, maxConnections,
+ loadPollInterval, FORCE_LOAD_UPDATE_FREQUENCY,
+ advisor);
+ List overflowAttributesList = new LinkedList();
+ ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+ overflowAttributesList.add(0, csc.getEvictionPolicy());
+ overflowAttributesList.add(1, Integer.valueOf(csc.getCapacity()));
+ overflowAttributesList.add(2, Integer.valueOf(this.port));
+ String diskStoreName = csc.getDiskStoreName();
+ if (diskStoreName != null) {
+ overflowAttributesList.add(3, diskStoreName);
+ overflowAttributesList.add(4, true); // indicator to use diskstore
+ } else {
+ overflowAttributesList.add(3, csc.getOverflowDirectory());
+ overflowAttributesList.add(4, false);
+ }
+
+ this.acceptor = new AcceptorImpl(getPort(),
+ getBindAddress(),
+ getNotifyBySubscription(),
+ getSocketBufferSize(),
+ getMaximumTimeBetweenPings(),
+ this.cache,
+ getMaxConnections(),
+ getMaxThreads(),
+ getMaximumMessageCount(),
+ getMessageTimeToLive(),
+ getTransactionTimeToLive(),
+ this.loadMonitor,
+ overflowAttributesList,
+ isSqlFabricSystem,
+ this.isGatewayReceiver,
+ this.gatewayTransportFilters, this.tcpNoDelay);
+
+ this.acceptor.start();
+ this.advisor.handshake();
+ this.loadMonitor.start(new ServerLocation(getExternalAddress(),
+ getPort()), acceptor.getStats());
+
+ // TODO : Need to provide facility to enable/disable client health monitoring.
+ //Creating ClientHealthMonitoring region.
+ // Force initialization on current cache
+ if(cache instanceof GemFireCacheImpl) {
+ ClientHealthMonitoringRegion.getInstance((GemFireCacheImpl)cache);
+ }
+ this.cache.getLoggerI18n().config(LocalizedStrings.CacheServerImpl_CACHESERVER_CONFIGURATION___0, getConfig());
+
+ /*
+ * If the stopped bridge server is restarted, we'll need to re-register the
+ * client membership listener. If the listener is already registered it
+ * won't be registered as would the case when start() is invoked for the
+ * first time.
+ */
+ ClientMembershipListener[] membershipListeners =
+ ClientMembership.getClientMembershipListeners();
+
+ boolean membershipListenerRegistered = false;
+ for (ClientMembershipListener membershipListener : membershipListeners) {
+ //just checking by reference as the listener instance is final
+ if (listener == membershipListener) {
+ membershipListenerRegistered = true;
+ break;
+ }
+ }
+
+ if (!membershipListenerRegistered) {
+ ClientMembership.registerClientMembershipListener(listener);
+ }
+
+ if (!isGatewayReceiver) {
+ InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
+ .getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.CACHE_SERVER_START, this);
+ }
+
+ }
+
+
+ /**
+ * Gets the address that this bridge server can be contacted on from external
+ * processes.
+ * @since 5.7
+ */
+ public String getExternalAddress() {
+ return getExternalAddress(true);
+ }
+
+ public String getExternalAddress(boolean checkServerRunning) {
+ if (checkServerRunning) {
+ if (!this.isRunning()) {
+ String s = "A bridge server's bind address is only available if it has been started";
+ this.cache.getCancelCriterion().checkCancelInProgress(null);
+ throw new IllegalStateException(s);
+ }
+ }
+ if (this.hostnameForClients == null || this.hostnameForClients.equals("")) {
+ if (this.acceptor != null) {
+ return this.acceptor.getExternalAddress();
+ }
+ else {
+ return null;
+ }
+ }
+ else {
+ return this.hostnameForClients;
+ }
+ }
+
+ public boolean isRunning() {
+ return this.acceptor != null && this.acceptor.isRunning();
+ }
+
+ public synchronized void stop() {
+ if (!isRunning()) {
+ return;
+ }
+
+ RuntimeException firstException = null;
+
+ try {
+ if(this.loadMonitor != null) {
+ this.loadMonitor.stop();
+ }
+ } catch(RuntimeException e) {
+ cache.getLoggerI18n().warning(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_LOAD_MONITOR, e);
+ firstException = e;
+ }
+
+ try {
+ if (this.advisor != null) {
+ this.advisor.close();
+ }
+ } catch(RuntimeException e) {
+ cache.getLoggerI18n().warning(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_ADVISOR, e);
+ firstException = e;
+ }
+
+ try {
+ if (this.acceptor != null) {
+ this.acceptor.close();
+ }
+ } catch(RuntimeException e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.CacheServerImpl_CACHESERVER_ERROR_CLOSING_ACCEPTOR_MONITOR), e);
+ if (firstException != null) {
+ firstException = e;
+ }
+ }
+
+ if(firstException != null) {
+ throw firstException;
+ }
+
+ //TODO : We need to clean up the admin region created for client
+ //monitoring.
+
+ // BridgeServer is still available, just not running, so we don't take
+ // it out of the cache's list...
+ // cache.removeBridgeServer(this);
+
+ /* Assuming start won't be called after stop */
+ ClientMembership.unregisterClientMembershipListener(listener);
+
+ TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager();
+ txMgr.removeHostedTXStatesForClients();
+
+ if (!isGatewayReceiver) {
+ InternalDistributedSystem system = ((GemFireCacheImpl) this.cache)
+ .getDistributedSystem();
+ system.handleResourceEvent(ResourceEvent.CACHE_SERVER_STOP, this);
+ }
+
+ }
+
+ private String getConfig() {
+ ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+ String str =
+ "port=" + getPort() + " max-connections=" + getMaxConnections()
+ + " max-threads=" + getMaxThreads() + " notify-by-subscription="
+ + getNotifyBySubscription() + " socket-buffer-size="
+ + getSocketBufferSize() + " maximum-time-between-pings="
+ + getMaximumTimeBetweenPings() + " maximum-message-count="
+ + getMaximumMessageCount() + " message-time-to-live="
+ + getMessageTimeToLive() + " eviction-policy=" + csc.getEvictionPolicy()
+ + " capacity=" + csc.getCapacity() + " overflow directory=";
+ if (csc.getDiskStoreName() != null) {
+ str += csc.getDiskStoreName();
+ } else {
+ str += csc.getOverflowDirectory();
+ }
+ str +=
+ " groups=" + Arrays.asList(getGroups())
+ + " loadProbe=" + loadProbe
+ + " loadPollInterval=" + loadPollInterval
+ + " tcpNoDelay=" + tcpNoDelay;
+ return str;
+ }
+
+ @Override
+ public String toString() {
+ ClientSubscriptionConfig csc = this.getClientSubscriptionConfig();
+ String str =
+ "CacheServer on port=" + getPort() + " client subscription config policy="
+ + csc.getEvictionPolicy() + " client subscription config capacity="
+ + csc.getCapacity();
+ if (csc.getDiskStoreName() != null) {
+ str += " client subscription config overflow disk store="
+ + csc.getDiskStoreName();
+ } else {
+ str += " client subscription config overflow directory="
+ + csc.getOverflowDirectory();
+ }
+ return str;
+ }
+
+ /**
+ * Test method used to access the internal acceptor
+ *
+ * @return the internal acceptor
+ */
+ public AcceptorImpl getAcceptor() {
+ return this.acceptor;
+ }
+
+ // DistributionAdvisee methods
+
+ public DM getDistributionManager() {
+ return getSystem().getDistributionManager();
+ }
+
+ public ClientSession getClientSession(String durableClientId) {
+ return getCacheClientNotifier().getClientProxy(durableClientId);
+ }
+
+ public ClientSession getClientSession(DistributedMember member) {
+ return getCacheClientNotifier().getClientProxy(
+ ClientProxyMembershipID.getClientId(member));
+ }
+
+ public Set getAllClientSessions() {
+ return new HashSet(getCacheClientNotifier().getClientProxies());
+ }
+
+ /**
+ * create client subscription
+ *
+ * @param cache
+ * @param ePolicy
+ * @param capacity
+ * @param port
+ * @param overFlowDir
+ * @param isDiskStore
+ * @return client subscription name
+ * @since 5.7
+ */
+ public static String clientMessagesRegion(GemFireCacheImpl cache, String ePolicy,
+ int capacity, int port, String overFlowDir, boolean isDiskStore) {
+ AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache,
+ ePolicy, capacity, overFlowDir, isDiskStore);
+ RegionAttributes attr = factory.create();
+
+ return createClientMessagesRegion(attr, cache, capacity, port);
+ }
+
+ public static AttributesFactory getAttribFactoryForClientMessagesRegion(
+ GemFireCacheImpl cache,
+ String ePolicy, int capacity, String overflowDir, boolean isDiskStore)
+ throws InvalidValueException, GemFireIOException {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+
+ if (isDiskStore) {
+ // overflowDir parameter is actually diskstore name
+ factory.setDiskStoreName(overflowDir);
+ // client subscription queue is always overflow to disk, so do async
+ // see feature request #41479
+ factory.setDiskSynchronous(true);
+ } else if (overflowDir == null || overflowDir.equals(ClientSubscriptionConfig.DEFAULT_OVERFLOW_DIRECTORY)) {
+ factory.setDiskStoreName(null);
+ // client subscription queue is always overflow to disk, so do async
+ // see feature request #41479
+ factory.setDiskSynchronous(true);
+ } else {
+ File dir = new File(overflowDir + File.separatorChar
+ + generateNameForClientMsgsRegion(OSProcess.getId()));
+ // This will delete the overflow directory when virtual machine terminates.
+ dir.deleteOnExit();
+ if (!dir.mkdirs() && !dir.isDirectory()) {
+ throw new GemFireIOException("Could not create client subscription overflow directory: "
+ + dir.getAbsolutePath());
+ }
+ File[] dirs = { dir };
+ DiskStoreFactory dsf = cache.createDiskStoreFactory();
+ DiskStore bsi = dsf.setAutoCompact(true)
+ .setDiskDirsAndSizes(dirs, new int[] { Integer.MAX_VALUE })
+ .create("bsi");
+ factory.setDiskStoreName("bsi");
+ // backward compatibility, it was sync
+ factory.setDiskSynchronous(true);
+ }
+ factory.setDataPolicy(DataPolicy.NORMAL);
+ // enable statistics
+ factory.setStatisticsEnabled(true);
+ /* setting LIFO related eviction attributes */
+ if (HARegionQueue.HA_EVICTION_POLICY_ENTRY.equals(ePolicy)) {
+ factory
+ .setEvictionAttributes(EvictionAttributesImpl.createLIFOEntryAttributes(
+ capacity, EvictionAction.OVERFLOW_TO_DISK));
+ }
+ else if (HARegionQueue.HA_EVICTION_POLICY_MEMORY.equals(ePolicy)) { // condition refinement
+ factory
+ .setEvictionAttributes(EvictionAttributesImpl.createLIFOMemoryAttributes(
+ capacity, EvictionAction.OVERFLOW_TO_DISK));
+ }
+ else {
+ // throw invalid eviction policy exception
+ throw new InvalidValueException(
+ LocalizedStrings.CacheServerImpl__0_INVALID_EVICTION_POLICY.toLocalizedString(ePolicy));
+ }
+ return factory;
+ }
+
+ public static String createClientMessagesRegion(RegionAttributes attr,
+ GemFireCacheImpl cache, int capacity, int port) {
+ // generating unique name in VM for ClientMessagesRegion
+ String regionName = generateNameForClientMsgsRegion(port);
+ try {
+ cache.createVMRegion(regionName, attr,
+ new InternalRegionArguments().setDestroyLockFlag(true)
+ .setRecreateFlag(false).setSnapshotInputStream(null)
+ .setImageTarget(null).setIsUsedForMetaRegion(true));
+ }
+ catch (RegionExistsException ree) {
+ InternalGemFireError assErr = new InternalGemFireError(
+ "unexpected exception");
+ assErr.initCause(ree);
+ throw assErr;
+ }
+ catch (IOException e) {
+ // only if loading snapshot, not here
+ InternalGemFireError assErr = new InternalGemFireError(
+ "unexpected exception");
+ assErr.initCause(e);
+ throw assErr;
+ }
+ catch (ClassNotFoundException e) {
+ // only if loading snapshot, not here
+ InternalGemFireError assErr = new InternalGemFireError(
+ "unexpected exception");
+ assErr.initCause(e);
+ throw assErr;
+ }
+ return regionName;
+ }
+
+ public static String createClientMessagesRegionForTesting(GemFireCacheImpl cache,
+ String ePolicy, int capacity, int port, int expiryTime, String overFlowDir, boolean isDiskStore) {
+ AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache,
+ ePolicy, capacity, overFlowDir, isDiskStore);
+ ExpirationAttributes ea = new ExpirationAttributes(expiryTime,
+ ExpirationAction.LOCAL_INVALIDATE);
+ factory.setEntryTimeToLive(ea);
+ RegionAttributes attr = factory.create();
+
+ return createClientMessagesRegion(attr, cache, capacity, port);
+ }
+
+ /**
+ * Generates the name for the client subscription using the given id.
+ *
+ * @param id
+ * @return String
+ * @since 5.7
+ */
+ public static String generateNameForClientMsgsRegion(int id) {
+ return ClientSubscriptionConfigImpl.CLIENT_SUBSCRIPTION + "_" + id;
+ }
+
+ /*
+ * Marker class name to identify the lock more easily in thread dumps private
+ * static class ClientMessagesRegionLock extends Object { }
+ */
+ public DistributionAdvisor getDistributionAdvisor() {
+ return this.advisor;
+ }
+
+ /**
+ * Returns the BridgeServerAdvisor for this server
+ */
+ public CacheServerAdvisor getCacheServerAdvisor() {
+ return this.advisor;
+ }
+
+ public Profile getProfile() {
+ return getDistributionAdvisor().createProfile();
+ }
+
+ public DistributionAdvisee getParentAdvisee() {
+ return null;
+ }
+
+ /**
+ * Returns the underlying <code>InternalDistributedSystem</code> connection.
+ * @return the underlying <code>InternalDistributedSystem</code>
+ */
+ public InternalDistributedSystem getSystem() {
+ return (InternalDistributedSystem)this.cache.getDistributedSystem();
+ }
+
+ public String getName() {
+ return "CacheServer";
+ }
+
+ public String getFullPath() {
+ return getName();
+ }
+
+ private final static AtomicInteger profileSN = new AtomicInteger();
+
+ private static int createSerialNumber() {
+ return profileSN.incrementAndGet();
+ }
+
+ /**
+ * Returns an array of all the groups of this bridge server.
+ * This includes those from the groups gemfire property
+ * and those explicitly added to this server.
+ */
+ public String[] getCombinedGroups() {
+ ArrayList<String> groupList = new ArrayList<String>();
+ for (String g: MemberAttributes.parseGroups(null, getSystem().getConfig().getGroups())) {
+ if (!groupList.contains(g)) {
+ groupList.add(g);
+ }
+ }
+ for (String g: getGroups()) {
+ if (!groupList.contains(g)) {
+ groupList.add(g);
+ }
+ }
+ String[] groups = new String[groupList.size()];
+ return groupList.toArray(groups);
+ }
+
+ public /*synchronized causes deadlock*/ void fillInProfile(Profile profile) {
+ assert profile instanceof CacheServerProfile;
+ CacheServerProfile bp = (CacheServerProfile)profile;
+ bp.setHost(getExternalAddress(false));
+ bp.setPort(getPort());
+ bp.setGroups(getCombinedGroups());
+ bp.setMaxConnections(maxConnections);
+ bp.setInitialLoad(loadMonitor.getLastLoad());
+ bp.setLoadPollInterval(getLoadPollInterval());
+ bp.serialNumber = getSerialNumber();
+ bp.finishInit();
+ }
+
+ public int getSerialNumber() {
+ return this.serialNumber;
+ }
+
+
+ protected CacheClientNotifier getCacheClientNotifier() {
+ return getAcceptor().getCacheClientNotifier();
+ }
+
+ /**
+ * Registers a new <code>InterestRegistrationListener</code> with the set of
+ * <code>InterestRegistrationListener</code>s.
+ *
+ * @param listener
+ * The <code>InterestRegistrationListener</code> to register
+ * @throws IllegalStateException if the BridgeServer has not been started
+ * @since 5.8Beta
+ */
+ public void registerInterestRegistrationListener(
+ InterestRegistrationListener listener) {
+ if (!this.isRunning()) {
+ throw new IllegalStateException(LocalizedStrings.CacheServerImpl_MUST_BE_RUNNING.toLocalizedString());
+ }
+ getCacheClientNotifier().registerInterestRegistrationListener(listener);
+ }
+
+ /**
+ * Unregisters an existing <code>InterestRegistrationListener</code> from
+ * the set of <code>InterestRegistrationListener</code>s.
+ *
+ * @param listener
+ * The <code>InterestRegistrationListener</code> to
+ * unregister
+ *
+ * @since 5.8Beta
+ */
+ public void unregisterInterestRegistrationListener(
+ InterestRegistrationListener listener) {
+ getCacheClientNotifier().unregisterInterestRegistrationListener(listener);
+ }
+
+ /**
+ * Returns a read-only set of <code>InterestRegistrationListener</code>s
+ * registered with this notifier.
+ *
+ * @return a read-only set of <code>InterestRegistrationListener</code>s
+ * registered with this notifier
+ *
+ * @since 5.8Beta
+ */
+ public Set getInterestRegistrationListeners() {
+ return getCacheClientNotifier().getInterestRegistrationListeners();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientRegionEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientRegionEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientRegionEventImpl.java
new file mode 100755
index 0000000..176ddcb
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientRegionEventImpl.java
@@ -0,0 +1,108 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+
+//import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
+
+/**
+ * Class <code>ClientRegionEventImpl</code> is a
+ * region event with the client's
+ * host and port for notification purposes.
+ *
+ * @author Girish Thombare
+ *
+ * @since 5.1
+ */
+public final class ClientRegionEventImpl extends RegionEventImpl
+ {
+
+ /**
+ * The originating membershipId of this event.
+ */
+ private ClientProxyMembershipID context;
+
+ public ClientRegionEventImpl() {
+ }
+
+ /**
+ * To be called from the Distributed Message without setting EventID
+ * @param region
+ * @param op
+ * @param callbackArgument
+ * @param originRemote
+ * @param distributedMember
+ */
+ public ClientRegionEventImpl(LocalRegion region, Operation op, Object callbackArgument,boolean originRemote, DistributedMember distributedMember,ClientProxyMembershipID contx) {
+ super(region, op,callbackArgument, originRemote,distributedMember);
+ setContext(contx);
+ }
+
+ public ClientRegionEventImpl(LocalRegion region, Operation op, Object callbackArgument,boolean originRemote, DistributedMember distributedMember,ClientProxyMembershipID contx,EventID eventId) {
+ super(region, op,callbackArgument, originRemote,distributedMember, eventId);
+ setContext(contx);
+ }
+
+
+ /**
+ * sets The membershipId originating this event
+ *
+ */
+ protected void setContext(ClientProxyMembershipID contx)
+ {
+ this.context = contx;
+ }
+
+ /**
+ * Returns The context originating this event
+ *
+ * @return The context originating this event
+ */
+ @Override
+ public ClientProxyMembershipID getContext()
+ {
+ return this.context;
+ }
+
+ @Override
+ public String toString()
+ {
+ String superStr = super.toString();
+ StringBuffer buffer = new StringBuffer();
+ String str = superStr.substring(0, superStr.length() - 1);
+ buffer.append(str).append(";context=").append(getContext()).append(']');
+ return buffer.toString();
+ }
+
+ @Override
+ public int getDSFID() {
+ return CLIENT_REGION_EVENT;
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException
+ {
+ super.toData(out);
+ DataSerializer.writeObject(getContext(), out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException
+ {
+ super.fromData(in);
+ setContext(ClientProxyMembershipID.readCanonicalized(in));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserver.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserver.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserver.java
new file mode 100755
index 0000000..0646f04
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserver.java
@@ -0,0 +1,90 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * This interface is used by testing/debugging code to be notified of different
+ * client/server events.
+ * See the documentation for class ClientServerObserverHolder for details.
+ *
+ * @author Yogesh Mahajan
+ * @since 5.1
+ *
+ */
+public interface ClientServerObserver
+{
+ /**
+ * This callback is called when now primary Ep is identified.
+ */
+ public void afterPrimaryIdentificationFromBackup(ServerLocation location);
+
+ /**
+ * This callback is called just before interest registartion
+ */
+ public void beforeInterestRegistration();
+
+ /**
+ * This callback is called just after interest registartion
+ */
+ public void afterInterestRegistration();
+
+ /**
+ * This callback is called just before primary identification
+ */
+ public void beforePrimaryIdentificationFromBackup();
+
+ /**
+ * This callback is called just before Interest Recovery by DSM thread happens
+ */
+ public void beforeInterestRecovery();
+
+ /**
+ * Invoked by CacheClientUpdater just before invoking endpointDied for
+ * fail over
+ * @param location ServerLocation which has failed
+ */
+ public void beforeFailoverByCacheClientUpdater(ServerLocation location);
+ /**
+ * Invoked before sending an instantiator message to server
+ *
+ * @param eventId
+ */
+ public void beforeSendingToServer(EventID eventId);
+ /**
+ * Invoked after sending an instantiator message to server
+ *
+ * @param eventId
+ */
+ public void afterReceivingFromServer(EventID eventId);
+
+ /**
+ * This callback is called just before sending client ack to the primary servrer.
+ */
+ public void beforeSendingClientAck();
+
+ /**
+ * Invoked after Message is created
+ *
+ * @param msg
+ */
+ public void afterMessageCreation(Message msg);
+
+ /**
+ * Invoked after Queue Destroy Message has been sent
+ */
+ public void afterQueueDestroyMessage();
+
+ /**
+ * Invoked after a primary is recovered from a backup or new connection.
+ */
+ public void afterPrimaryRecovered(ServerLocation location);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverAdapter.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverAdapter.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverAdapter.java
new file mode 100755
index 0000000..094bb58
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverAdapter.java
@@ -0,0 +1,107 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+
+/**
+ * This class provides 'do-nothing' implementations of all of the methods of
+ * interface ClientServerObserver. See the documentation for class
+ * ClientServerObserverHolder for details.
+ *
+ * @author Yogesh Mahajan
+ * @since 5.1
+ */
+public class ClientServerObserverAdapter implements ClientServerObserver
+{
+ /**
+ * This callback is called when now primary Ep is identified.
+ */
+ public void afterPrimaryIdentificationFromBackup(ServerLocation primaryEndpoint)
+ {
+ }
+
+ /**
+ * This callback is called just before interest registartion
+ */
+ public void beforeInterestRegistration()
+ {
+ }
+
+ /**
+ * This callback is called just after interest registartion
+ */
+ public void afterInterestRegistration()
+ {
+ }
+
+ /**
+ * This callback is called just before primary identification
+ */
+ public void beforePrimaryIdentificationFromBackup()
+ {
+ }
+
+ /**
+ * This callback is called just before Interest Recovery by DSM thread happens
+ */
+ public void beforeInterestRecovery()
+ {
+
+ }
+
+ public void beforeFailoverByCacheClientUpdater(ServerLocation epFailed)
+ {
+ }
+ /**
+ * Invoked before sending an instantiator message to server
+ *
+ * @param eventId
+ */
+ public void beforeSendingToServer(EventID eventId){
+
+ }
+ /**
+ * Invoked after sending an instantiator message to server
+ *
+ * @param eventId
+ */
+ public void afterReceivingFromServer(EventID eventId){
+
+ }
+
+ /**
+ * This callback is called just before sending client ack to the primary servrer.
+ */
+ public void beforeSendingClientAck(){
+
+ }
+
+ /**
+ * Invoked after Message is created
+ *
+ * @param msg
+ */
+ public void afterMessageCreation(Message msg){
+
+ }
+
+ /**
+ * Invoked after Queue Destroy Message has been sent
+ */
+ public void afterQueueDestroyMessage(){
+
+ }
+
+ /**
+ * Invoked after a primary is recovered from a backup or new connection.
+ */
+ public void afterPrimaryRecovered(ServerLocation location) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverHolder.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverHolder.java
new file mode 100755
index 0000000..003852b
--- /dev/null
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ClientServerObserverHolder.java
@@ -0,0 +1,53 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import com.gemstone.gemfire.cache.query.internal.Support;
+
+/**
+ * This class is intended to hold a single 'observer' which will receive
+ * callbacks. There can be only one such observer at a time. If no observer is
+ * needed, this member variable should point to an object with 'do-nothing'
+ * methods, such as ClientServerObserverAdapter.
+ *
+ * @author Yogesh Mahajan
+ * @since 5.1
+ */
+public class ClientServerObserverHolder
+ {
+
+ /**
+ * The default 'do-nothing' bridge observer *
+ */
+ private static final ClientServerObserver NO_OBSERVER = new ClientServerObserverAdapter();
+
+ /**
+ * The current observer which will be notified of all query events.
+ */
+ private static ClientServerObserver _instance = NO_OBSERVER;
+
+ /**
+ * Set the given observer to be notified of events. Returns the current
+ * observer.
+ */
+ public static final ClientServerObserver setInstance(ClientServerObserver observer)
+ {
+ Support.assertArg(observer != null,
+ "setInstance expects a non-null argument!");
+ ClientServerObserver oldObserver = _instance;
+ _instance = observer;
+ return oldObserver;
+ }
+
+ /** Return the current BridgeObserver instance */
+ public static final ClientServerObserver getInstance()
+ {
+ return _instance;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java
index 921cbf9..198803d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DestroyRegionOperation.java
@@ -105,9 +105,9 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
protected CacheOperationMessage createMessage()
{
DestroyRegionMessage mssg;
- if (this.event instanceof BridgeRegionEventImpl) {
+ if (this.event instanceof ClientRegionEventImpl) {
mssg = new DestroyRegionWithContextMessage();
- ((DestroyRegionWithContextMessage)mssg).context = ((BridgeRegionEventImpl)this.event)
+ ((DestroyRegionWithContextMessage)mssg).context = ((ClientRegionEventImpl)this.event)
.getContext();
}
else {
@@ -502,7 +502,7 @@ public class DestroyRegionOperation extends DistributedCacheOperation {
@Override
final public RegionEventImpl createRegionEvent(DistributedRegion rgn)
{
- BridgeRegionEventImpl event = new BridgeRegionEventImpl(rgn,
+ ClientRegionEventImpl event = new ClientRegionEventImpl(rgn,
getOperation(), this.callbackArg, true /* originRemote */,
getSender(), (ClientProxyMembershipID)this.context);
return event;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
index 4dbc9c4..8056120 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedClearOperation.java
@@ -128,9 +128,9 @@ public class DistributedClearOperation extends DistributedCacheOperation
protected CacheOperationMessage createMessage()
{
ClearRegionMessage mssg;
- if (this.event instanceof BridgeRegionEventImpl) {
+ if (this.event instanceof ClientRegionEventImpl) {
mssg = new ClearRegionWithContextMessage();
- ((ClearRegionWithContextMessage)mssg).context = ((BridgeRegionEventImpl)this.event)
+ ((ClearRegionWithContextMessage)mssg).context = ((ClientRegionEventImpl)this.event)
.getContext();
}
@@ -271,7 +271,7 @@ public class DistributedClearOperation extends DistributedCacheOperation
final public RegionEventImpl createRegionEvent(DistributedRegion rgn)
{
- BridgeRegionEventImpl event = new BridgeRegionEventImpl(rgn,
+ ClientRegionEventImpl event = new ClientRegionEventImpl(rgn,
getOperation(), this.callbackArg, true /* originRemote */,
getSender(), (ClientProxyMembershipID)this.context);
return event;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
index 790dc4d..77fbc88 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DistributedRegion.java
@@ -2893,9 +2893,6 @@ public class DistributedRegion extends LocalRegion implements
protected void cacheWriterChanged(CacheWriter oldWriter)
{
super.cacheWriterChanged(oldWriter);
- if (isBridgeWriter(oldWriter)) {
- oldWriter = null;
- }
if (oldWriter == null ^ basicGetWriter() == null) {
new UpdateAttributesProcessor(this).distribute();
}
@@ -2905,9 +2902,6 @@ public class DistributedRegion extends LocalRegion implements
protected void cacheLoaderChanged(CacheLoader oldLoader)
{
super.cacheLoaderChanged(oldLoader);
- if (isBridgeLoader(oldLoader)) {
- oldLoader = null;
- }
if (oldLoader == null ^ basicGetLoader() == null) {
new UpdateAttributesProcessor(this).distribute();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
index 5c428b2..357c0a8 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ExpiryTask.java
@@ -28,7 +28,6 @@ import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.ExpirationAction;
import com.gemstone.gemfire.cache.ExpirationAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
-import com.gemstone.gemfire.cache.util.BridgeWriterException;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.PooledExecutorWithDMStats;
import com.gemstone.gemfire.internal.SystemTimer;
@@ -364,23 +363,6 @@ public abstract class ExpiryTask extends SystemTimer.SystemTimerTask {
}
catch (CancelException ex) {
// ignore
-
- // @todo grid: do we need to deal with pool exceptions here?
- } catch (BridgeWriterException ex) {
- // Some exceptions from the bridge writer should not be logged.
- Throwable cause = ex.getCause();
- // BridgeWriterExceptions from the server are wrapped in CacheWriterExceptions
- if (cause != null && cause instanceof CacheWriterException)
- cause = cause.getCause();
- if (cause instanceof RegionDestroyedException ||
- cause instanceof EntryNotFoundException ||
- cause instanceof CancelException) {
- if (logger.isDebugEnabled()) {
- logger.debug("Exception in expiration task", ex);
- }
- } else {
- logger.fatal(LocalizedMessage.create(LocalizedStrings.ExpiryTask_EXCEPTION_IN_EXPIRATION_TASK), ex);
- }
}
catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java
index 1366f94..8d782a9 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java
@@ -76,11 +76,11 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 {
private static void findLocalDurableQueues(ClientProxyMembershipID proxyId, ArrayList<ServerLocation> matches) {
Cache c = GemFireCacheImpl.getInstance();
if(c!=null) {
- List l = c.getBridgeServers();
+ List l = c.getCacheServers();
if(l!=null) {
Iterator i = l.iterator();
while(i.hasNext()) {
- BridgeServerImpl bs = (BridgeServerImpl)i.next();
+ CacheServerImpl bs = (CacheServerImpl)i.next();
if(bs.getAcceptor().getCacheClientNotifier().getClientProxy(proxyId)!=null) {
ServerLocation loc = new ServerLocation(bs.getExternalAddress(),bs.getPort());
matches.add(loc);
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 79bcbc2..4bf0f42 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
@@ -136,7 +136,6 @@ import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
import com.gemstone.gemfire.cache.server.CacheServer;
import com.gemstone.gemfire.cache.snapshot.CacheSnapshotService;
-import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.cache.util.GatewayConflictResolver;
import com.gemstone.gemfire.cache.util.ObjectSizer;
import com.gemstone.gemfire.cache.wan.GatewayReceiver;
@@ -371,11 +370,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
private volatile DistributionAdvisee sqlfAdvisee;
/**
- * the list of all bridge servers. CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval
- * operations. It is assumed that the traversal operations on bridge servers list vastly outnumber the mutative
+ * the list of all cache servers. CopyOnWriteArrayList is used to allow concurrent add, remove and retrieval
+ * operations. It is assumed that the traversal operations on cache servers list vastly outnumber the mutative
* operations such as add, remove.
*/
- private volatile List allBridgeServers = new CopyOnWriteArrayList();
+ private volatile List allCacheServers = new CopyOnWriteArrayList();
/**
* Controls updates to the list of all gateway senders
@@ -664,7 +663,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
sb.append("; lockLease = " + this.lockLease);
sb.append("; lockTimeout = " + this.lockTimeout);
// sb.append("; rootRegions = (" + this.rootRegions + ")");
- // sb.append("; bridgeServers = (" + this.bridgeServers + ")");
+ // sb.append("; cacheServers = (" + this.cacheServers + ")");
// sb.append("; regionAttributes = (" + this.listRegionAttributes());
// sb.append("; gatewayHub = " + gatewayHub);
if (this.creationStack != null) {
@@ -1513,7 +1512,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
/**
- * Close the distributed system, bridge servers, and gateways. Clears the rootRegions and partitionedRegions map.
+ * Close the distributed system, cache servers, and gateways. Clears the rootRegions and partitionedRegions map.
* Marks the cache as closed.
*
* @see SystemFailure#emergencyClose()
@@ -1546,14 +1545,14 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
inst.disconnectCause = SystemFailure.getFailure();
inst.isClosing = true;
- // Clear bridge servers
+ // Clear cache servers
if (DEBUG) {
- System.err.println("DEBUG: Close bridge servers");
+ System.err.println("DEBUG: Close cache servers");
}
{
- Iterator allBridgeServersItr = inst.allBridgeServers.iterator();
- while (allBridgeServersItr.hasNext()) {
- BridgeServerImpl bs = (BridgeServerImpl) allBridgeServersItr.next();
+ Iterator allCacheServersItr = inst.allCacheServers.iterator();
+ while (allCacheServersItr.hasNext()) {
+ CacheServerImpl bs = (CacheServerImpl) allCacheServersItr.next();
AcceptorImpl ai = bs.getAcceptor();
if (ai != null) {
ai.emergencyClose();
@@ -1986,7 +1985,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
TXStateProxy tx = null;
try {
this.keepAlive = keepalive;
- PoolManagerImpl.setKeepAlive(keepalive);
if (this.txMgr != null) {
tx = this.txMgr.internalSuspend();
@@ -2044,7 +2042,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
stopRedisServer();
// no need to track PR instances since we won't create any more
- // bridgeServers or gatewayHubs
+ // cacheServers or gatewayHubs
if (this.partitionedRegions != null) {
if (isDebugEnabled) {
logger.debug("{}: clearing partitioned regions...", this);
@@ -2616,12 +2614,12 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
- logger.debug("{}: stopping bridge servers...", this);
+ logger.debug("{}: stopping cache servers...", this);
}
- boolean stoppedBridgeServer = false;
- Iterator allBridgeServersIterator = this.allBridgeServers.iterator();
- while (allBridgeServersIterator.hasNext()) {
- BridgeServerImpl bridge = (BridgeServerImpl) allBridgeServersIterator.next();
+ boolean stoppedCacheServer = false;
+ Iterator allCacheServersIterator = this.allCacheServers.iterator();
+ while (allCacheServersIterator.hasNext()) {
+ CacheServerImpl bridge = (CacheServerImpl) allCacheServersIterator.next();
if (isDebugEnabled) {
logger.debug("stopping bridge {}", bridge);
}
@@ -2632,11 +2630,11 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
logger.debug("Ignored cache closure while closing bridge {}", bridge, e);
}
}
- allBridgeServers.remove(bridge);
- stoppedBridgeServer = true;
+ allCacheServers.remove(bridge);
+ stoppedCacheServer = true;
}
- if (stoppedBridgeServer) {
- // now that all the bridge servers have stopped empty the static pool of commBuffers it might have used.
+ if (stoppedCacheServer) {
+ // now that all the cache servers have stopped empty the static pool of commBuffers it might have used.
ServerConnection.emptyCommBufferPool();
}
@@ -3784,10 +3782,6 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
return this.eventThreadPool;
}
- public BridgeServer addBridgeServer() {
- return (BridgeServer) addCacheServer();
- }
-
public CacheServer addCacheServer() {
return addCacheServer(false);
}
@@ -3798,8 +3792,8 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
stopper.checkCancelInProgress(null);
- BridgeServerImpl bridge = new BridgeServerImpl(this, isGatewayReceiver);
- allBridgeServers.add(bridge);
+ CacheServerImpl bridge = new CacheServerImpl(this, isGatewayReceiver);
+ allCacheServers.add(bridge);
sendAddCacheServerProfileMessage();
return bridge;
@@ -3972,33 +3966,29 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
}
}
- public List getBridgeServers() {
- return getCacheServers();
- }
-
public List getCacheServers() {
- List bridgeServersWithoutReceiver = null;
- if (!allBridgeServers.isEmpty()) {
- Iterator allBridgeServersIterator = allBridgeServers.iterator();
- while (allBridgeServersIterator.hasNext()) {
- BridgeServerImpl bridgeServer = (BridgeServerImpl) allBridgeServersIterator.next();
- // If BridgeServer is a GatewayReceiver, don't return as part of CacheServers
- if (!bridgeServer.isGatewayReceiver()) {
- if (bridgeServersWithoutReceiver == null) {
- bridgeServersWithoutReceiver = new ArrayList();
+ List cacheServersWithoutReceiver = null;
+ if (!allCacheServers.isEmpty()) {
+ Iterator allCacheServersIterator = allCacheServers.iterator();
+ while (allCacheServersIterator.hasNext()) {
+ CacheServerImpl cacheServer = (CacheServerImpl) allCacheServersIterator.next();
+ // If CacheServer is a GatewayReceiver, don't return as part of CacheServers
+ if (!cacheServer.isGatewayReceiver()) {
+ if (cacheServersWithoutReceiver == null) {
+ cacheServersWithoutReceiver = new ArrayList();
}
- bridgeServersWithoutReceiver.add(bridgeServer);
+ cacheServersWithoutReceiver.add(cacheServer);
}
}
}
- if (bridgeServersWithoutReceiver == null) {
- bridgeServersWithoutReceiver = Collections.emptyList();
+ if (cacheServersWithoutReceiver == null) {
+ cacheServersWithoutReceiver = Collections.emptyList();
}
- return bridgeServersWithoutReceiver;
+ return cacheServersWithoutReceiver;
}
- public List getBridgeServersAndGatewayReceiver() {
- return allBridgeServers;
+ public List getCacheServersAndGatewayReceiver() {
+ return allCacheServers;
}
/**
@@ -4126,9 +4116,9 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
boolean hasSerialSenders = hasSerialSenders(r);
boolean result = hasSerialSenders;
if (!result) {
- Iterator allBridgeServersIterator = allBridgeServers.iterator();
- while (allBridgeServersIterator.hasNext()) {
- BridgeServerImpl server = (BridgeServerImpl) allBridgeServersIterator.next();
+ Iterator allCacheServersIterator = allCacheServers.iterator();
+ while (allCacheServersIterator.hasNext()) {
+ CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next();
if (!server.getNotifyBySubscription()) {
result = true;
break;
@@ -4182,7 +4172,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer
stopper.checkCancelInProgress(null);
if (!this.isServer) {
- return (this.allBridgeServers.size() > 0);
+ return (this.allCacheServers.size() > 0);
} else {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java
index acafd6d..13d6068 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java
@@ -45,7 +45,7 @@ public abstract class GridAdvisor extends DistributionAdvisor {
private static final Filter BRIDGE_SERVER_FILTER = new Filter() {
public boolean include(Profile profile) {
- return profile instanceof BridgeServerAdvisor.BridgeServerProfile;
+ return profile instanceof CacheServerAdvisor.CacheServerProfile;
}
};
@@ -327,9 +327,9 @@ public abstract class GridAdvisor extends DistributionAdvisor {
boolean exchangeProfiles, final List<Profile> replyProfiles) {
final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.isClosed()) {
- List<?> bridgeServers = cache.getBridgeServersAndGatewayReceiver();
+ List<?> bridgeServers = cache.getCacheServersAndGatewayReceiver();
for (int i = 0; i < bridgeServers.size(); i++) {
- BridgeServerImpl bsi = (BridgeServerImpl)bridgeServers.get(i);
+ CacheServerImpl bsi = (CacheServerImpl)bridgeServers.get(i);
if (bsi.isRunning()) {
if(bsi.getProfile().equals(this)) {
continue;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 7c1ec89..9e5bcd2 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -100,7 +100,6 @@ import com.gemstone.gemfire.cache.TransactionId;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.ServerOperationException;
import com.gemstone.gemfire.cache.client.SubscriptionNotEnabledException;
-import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.Endpoint;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
@@ -134,7 +133,6 @@ import com.gemstone.gemfire.cache.query.internal.index.IndexCreationData;
import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
import com.gemstone.gemfire.cache.query.internal.index.IndexProtocol;
import com.gemstone.gemfire.cache.query.internal.index.IndexUtils;
-import com.gemstone.gemfire.cache.util.BridgeWriterException;
import com.gemstone.gemfire.cache.util.ObjectSizer;
import com.gemstone.gemfire.cache.wan.GatewaySender;
import com.gemstone.gemfire.distributed.DistributedMember;
@@ -674,9 +672,7 @@ public class LocalRegion extends AbstractRegion
}
// initialize client to server proxy
- this.srp = ((this.getPoolName() != null)
- || isBridgeLoader(this.getCacheLoader())
- || isBridgeWriter(this.getCacheWriter()))
+ this.srp = (this.getPoolName() != null)
? new ServerRegionProxy(this)
: null;
this.imageState =
@@ -3986,22 +3982,6 @@ public class LocalRegion extends AbstractRegion
reinitialize(inputStream, event);
}
-// public void createRegionOnServer() throws CacheWriterException
-// {
-// if (basicGetWriter() instanceof BridgeWriter) {
-// if (getParentRegion() != null) {
-// BridgeWriter bw = (BridgeWriter)basicGetWriter();
-// bw.createRegionOnServer(getParentRegion().getFullPath(), getName());
-// }
-// else {
-// throw new CacheWriterException(LocalizedStrings.LocalRegion_REGION_0_IS_A_ROOT_REGION_ONLY_NONROOT_REGIONS_CAN_BE_CREATED_ON_THE_SERVER.toLocalizedString(getFullPath()));
-// }
-// }
-// else {
-// throw new CacheWriterException(LocalizedStrings.LocalRegion_SERVER_REGION_CREATION_IS_ONLY_SUPPORTED_ON_CLIENT_SERVER_TOPOLOGIES_THE_CURRENT_CACHEWRITER_IS_0.toLocalizedString(this.cacheWriter));
-// }
-// }
-
public void registerInterest(Object key)
{
registerInterest(key, false);
@@ -4068,13 +4048,8 @@ public class LocalRegion extends AbstractRegion
throw new IllegalStateException(LocalizedStrings.LocalRegion_DURABLE_FLAG_ONLY_APPLICABLE_FOR_DURABLE_CLIENTS.toLocalizedString());
}
if (!proxy.getPool().getSubscriptionEnabled()) {
- if (proxy.getPool() instanceof BridgePoolImpl) {
- String msg = "Interest registration requires establishCallbackConnection to be set to true.";
- throw new BridgeWriterException(msg);
- } else {
- String msg = "Interest registration requires a pool whose queue is enabled.";
- throw new SubscriptionNotEnabledException(msg);
- }
+ String msg = "Interest registration requires a pool whose queue is enabled.";
+ throw new SubscriptionNotEnabledException(msg);
}
if (getAttributes().getDataPolicy().withReplication() // fix for bug 36185
@@ -4101,7 +4076,7 @@ public class LocalRegion extends AbstractRegion
this.clearKeysOfInterest(key, interestType, pol);
// Checking for the Dunit test(testRegisterInterst_Destroy_Concurrent) flag
if (PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG) {
- BridgeObserver bo = BridgeObserverHolder.getInstance();
+ ClientServerObserver bo = ClientServerObserverHolder.getInstance();
bo.beforeInterestRegistration();
}// Test Code Ends
final byte regionDataPolicy = getAttributes().getDataPolicy().ordinal;
@@ -9786,7 +9761,7 @@ public class LocalRegion extends AbstractRegion
}
}
- RegionEventImpl event = new BridgeRegionEventImpl(this, Operation.REGION_DESTROY,
+ RegionEventImpl event = new ClientRegionEventImpl(this, Operation.REGION_DESTROY,
callbackArg,false, client.getDistributedMember(), client/* context */, eventId);
basicDestroyRegion(event, true);
@@ -9811,7 +9786,7 @@ public class LocalRegion extends AbstractRegion
}
}
- RegionEventImpl event = new BridgeRegionEventImpl(this, Operation.REGION_CLEAR,
+ RegionEventImpl event = new ClientRegionEventImpl(this, Operation.REGION_CLEAR,
callbackArg,false, client.getDistributedMember(), client/* context */, eventId);
basicClear(event, true);
@@ -11310,7 +11285,7 @@ public class LocalRegion extends AbstractRegion
*/
protected boolean shouldNotifyBridgeClients()
{
- return (this.cache.getBridgeServers().size() > 0)
+ return (this.cache.getCacheServers().size() > 0)
&& !this.isUsedForPartitionedRegionAdmin
&& !this.isUsedForPartitionedRegionBucket
&& !this.isUsedForMetaRegion;
@@ -11444,7 +11419,7 @@ public class LocalRegion extends AbstractRegion
predicate = predicate.trim();
// Compare the query patterns to the 'predicate'. If one matches,
- // send it as is to the BridgeLoader
+ // send it as is to the server
boolean matches = false;
for (int i=0; i<QUERY_PATTERNS.length; i++) {
if (QUERY_PATTERNS[i].matcher(predicate).matches()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
index c007891..629c5a4 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java
@@ -5457,9 +5457,6 @@ public class PartitionedRegion extends LocalRegion implements
protected void cacheWriterChanged(CacheWriter p_oldWriter) {
CacheWriter oldWriter = p_oldWriter;
super.cacheWriterChanged(oldWriter);
- if (isBridgeWriter(oldWriter)) {
- oldWriter = null;
- }
if (oldWriter == null ^ basicGetWriter() == null) {
new UpdateAttributesProcessor(this).distribute();
}
@@ -5469,9 +5466,6 @@ public class PartitionedRegion extends LocalRegion implements
@Override
protected void cacheLoaderChanged(CacheLoader oldLoader) {
CacheLoader myOldLoader = oldLoader;
- if (isBridgeLoader(oldLoader)) {
- myOldLoader = null;
- }
this.dataStore.cacheLoaderChanged(basicGetLoader(), myOldLoader);
super.cacheLoaderChanged(oldLoader);
if (myOldLoader == null ^ basicGetLoader() == null) {
@@ -5902,7 +5896,7 @@ public class PartitionedRegion extends LocalRegion implements
Collections.addAll(localServerGroups, MemberAttributes.parseGroups(null, c.getSystem().getConfig().getGroups()));
for (Object object : servers) {
- BridgeServerImpl server = (BridgeServerImpl)object;
+ CacheServerImpl server = (CacheServerImpl)object;
if (server.isRunning() && (server.getExternalAddress() != null)) {
Collections.addAll(localServerGroups, server.getGroups());
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java
index 4569184..0838d29 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolFactoryImpl.java
@@ -27,8 +27,6 @@ import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.RegionService;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolFactory;
-import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl;
-import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl.LBPolicy;
import com.gemstone.gemfire.cache.client.internal.LocatorDiscoveryCallback;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.query.QueryService;
@@ -286,141 +284,14 @@ public class PoolFactoryImpl implements PoolFactory {
this.attributes.servers.addAll(cp.getServers());
}
- public void init(Properties properties, boolean usedByBridgeWriter,
- boolean usedByGateway, GatewaySender sender) {
+ public void init(GatewaySender sender) {
+ this.attributes.setGateway(true);
this.attributes.setGatewaySender(sender);
- init(properties, usedByBridgeWriter, usedByGateway);
- }
- /**
- * Used to create a pool given the old Bridge properties
- * @param properties from a BridgeWriter or BridgeLoader
- * @param usedByBridgeWriter true if props from BridgeWriter;
- * false if props from BridgeLoader.
- * *param usedByGateway true if props from GatewayImpl.
- * @since 5.7
- */
- public void init(Properties properties, boolean usedByBridgeWriter,
- boolean usedByGateway) {
- this.attributes.setBridge(usedByBridgeWriter || !usedByGateway);
- this.attributes.setBridgeWriter(usedByBridgeWriter);
- this.attributes.setGateway(usedByGateway);
setIdleTimeout(-1); // never time out
setLoadConditioningInterval(-1); // never time out
setMaxConnections(-1);
- int endpointCount = 0;
- boolean useLocators = false;
- boolean useEndPoints = false;
- IllegalArgumentException exception = null;
- if (properties.containsKey(DistributionConfig.LOCATORS_NAME)) {
- String locatorObject = properties
- .getProperty(DistributionConfig.LOCATORS_NAME);
- if (locatorObject != null && !locatorObject.equals("")) {
- StringTokenizer locatorsOnThisVM = new StringTokenizer(locatorObject, ",");
- while (locatorsOnThisVM.hasMoreTokens()) {
- String localLocator = locatorsOnThisVM.nextToken();
- DistributionLocatorId locatorId = new DistributionLocatorId(
- localLocator);
- addLocator(locatorId.getHost().getHostName(), locatorId.getPort());
- }
- useLocators = true;
- }
- }
- if (!useLocators && properties.containsKey("endpoints")) {
- useEndPoints = true;
- String pv = properties.getProperty("endpoints");
- StringTokenizer tokenizer = new StringTokenizer(pv, ",");
- while (tokenizer.hasMoreTokens()) {
- String serverdetail = tokenizer.nextToken();
- int cIndex = serverdetail.indexOf("=");
- // note we throw the name away
-// String name = serverdetail.substring(0, cIndex);
-// if (name != null) {
-// name = name.trim();
-// }
- String remainder = serverdetail.substring(cIndex + 1);
- cIndex = remainder.lastIndexOf(":");
- String host = remainder.substring(0, cIndex);
- if (host != null) {
- host = host.trim();
- }
- String port = remainder.substring(cIndex + 1);
- if (port != null) {
- port = port.trim();
- }
- try {
- addServer(host, Integer.parseInt(port));
- endpointCount++;
- } catch (IllegalArgumentException e) {
- if (!(e.getCause() instanceof UnknownHostException)) {
- throw e;
- } else {
- exception = e;
- }
- }
- }
- if ((endpointCount == 0) && (exception != null)) {
- IllegalArgumentException ex = new IllegalArgumentException("Couldn't find any Endpoint. " + exception.getMessage());
- ex.initCause(exception.getCause());
- throw ex;
- }
- }
- if(!useLocators && !useEndPoints) {
- throw new IllegalArgumentException(
- "Property 'locators ' or 'endpoints' must be specified");
- }
- // @todo grid: handshakeTimeout ignored
- {
- // @todo grid: roundRobin and appAssisted ignored
- LBPolicy policy = new LBPolicy(properties.getProperty("LBPolicy",
- LBPolicy.STICKY_PROPERTY_NAME));
- setThreadLocalConnections(policy.isSticky());
- }
-
- if (properties.containsKey("retryAttempts")) {
- String strRetryAttempts = properties.getProperty("retryAttempts");
- setRetryAttempts(Integer.parseInt(strRetryAttempts));
- }
- if (properties.containsKey("retryInterval")) {
- String strRetryAttempts = properties.getProperty("retryInterval");
- setPingInterval(Integer.parseInt(strRetryAttempts));
- }
- if (properties.containsKey("establishCallbackConnection")) {
- String str = properties.getProperty("establishCallbackConnection");
- setSubscriptionEnabled(Boolean.valueOf(str).booleanValue());
- }
- if (properties.containsKey("enablePRSingleHop")) {
- String str = properties.getProperty("enablePRSingleHop");
- setPRSingleHopEnabled(Boolean.valueOf(str).booleanValue());
- }
- if (properties.containsKey("connectionsPerServer")) {
- String str = properties.getProperty("connectionsPerServer");
- setMinConnections(Integer.parseInt(str)*endpointCount);
- } else {
- setMinConnections(1*endpointCount);
- }
- if (properties.containsKey("redundancyLevel")) {
- String str = properties.getProperty("redundancyLevel");
- setSubscriptionRedundancy(Integer.parseInt(str));
- }
- if (properties.containsKey("readTimeout")) {
- String strReadTimeout = properties.getProperty("readTimeout");
- setReadTimeout(Integer.parseInt(strReadTimeout));
- }
- if (properties.containsKey("socketBufferSize")) {
- String strSocketBufferSize = properties.getProperty("socketBufferSize");
- setSocketBufferSize(Integer.parseInt(strSocketBufferSize));
- }
- if (properties.containsKey("messageTrackingTimeout")) {
- String pv = properties.getProperty("messageTrackingTimeout");
- setSubscriptionMessageTrackingTimeout(Integer.parseInt(pv));
- }
- if(properties.containsKey("clientAckInterval") ) {
- String pv = properties.getProperty("clientAckInterval");
- setSubscriptionAckInterval(Integer.parseInt(pv));
- }
- if(usedByGateway && exception!= null) {
- throw exception;
- }
+ setMinConnections(0);
+ setThreadLocalConnections(true);
}
/**
@@ -441,11 +312,7 @@ public class PoolFactoryImpl implements PoolFactory {
registry.creatingPool();
}
}
- if (this.attributes.isBridge()) {
- return new BridgePoolImpl(this.pm, name, this.attributes);
- } else {
- return PoolImpl.create(this.pm, name, this.attributes);
- }
+ return PoolImpl.create(this.pm, name, this.attributes);
}
/**
@@ -487,19 +354,7 @@ public class PoolFactoryImpl implements PoolFactory {
public transient LocatorDiscoveryCallback locatorCallback = null; //only used by tests
public GatewaySender gatewaySender = null;
/**
- * True if this factory needs to produce a pool for use by BridgeWriter
- * or BridgeLoader.
- */
- public boolean bridge = false;
- /**
- * True if bridge is true and the pool is used by a BridgeWriter.
- * False if bridge is true and the pool is used by a BridgeLoader.
- * Ignore this attribute if bridge is false.
- */
- public boolean bridgeWriter = false;
- /**
- * True if bridge is true and the pool is used by a Gateway.
- * Ignore this attribute if bridge is false.
+ * True if the pool is used by a Gateway.
*/
public boolean gateway = false;
@@ -554,18 +409,6 @@ public class PoolFactoryImpl implements PoolFactory {
public String getServerGroup() {
return this.serverGroup;
}
- public boolean isBridge() {
- return this.bridge;
- }
- public void setBridge(boolean v) {
- this.bridge = v;
- }
- public boolean isBridgeWriter() {
- return this.bridgeWriter;
- }
- public void setBridgeWriter(boolean v) {
- this.bridgeWriter = v;
- }
public boolean isGateway() {
return this.gateway;
}
@@ -648,7 +491,6 @@ public class PoolFactoryImpl implements PoolFactory {
DataSerializer.writeString(this.serverGroup, out);
DataSerializer.writeArrayList(this.locators, out);
DataSerializer.writeArrayList(this.servers, out);
- DataSerializer.writePrimitiveBoolean(this.bridge, out);
DataSerializer.writePrimitiveInt(this.statisticInterval, out);
DataSerializer.writePrimitiveBoolean(this.multiuserSecureModeEnabled,out);
}
@@ -671,7 +513,6 @@ public class PoolFactoryImpl implements PoolFactory {
this.serverGroup = DataSerializer.readString(in);
this.locators = DataSerializer.readArrayList(in);
this.servers = DataSerializer.readArrayList(in);
- this.bridge = DataSerializer.readPrimitiveBoolean(in);
this.statisticInterval= DataSerializer.readPrimitiveInt(in);
this.multiuserSecureModeEnabled = DataSerializer.readPrimitiveBoolean(in);
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
index 289a3f5..25ba55d 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PoolManagerImpl.java
@@ -22,7 +22,6 @@ import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.client.Pool;
import com.gemstone.gemfire.cache.client.PoolFactory;
import com.gemstone.gemfire.cache.client.PoolManager;
-import com.gemstone.gemfire.cache.client.internal.BridgePoolImpl;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.client.internal.RegisterDataSerializersOp;
import com.gemstone.gemfire.cache.client.internal.RegisterInstantiatorsOp;
@@ -102,24 +101,6 @@ public class PoolManagerImpl {
}
/**
- * Set the keep alive flag before closing. Only for use with the deprecated
- * BridgeWriter/Loader code. A BridgeWriter is automatically
- * closed then the last region is disconnected from it,
- * so we need to mark the connections as keep alive
- * before we close the regions that use the bridge writer/loader
- *
- * @param keepAlive
- */
- public static void setKeepAlive(boolean keepAlive) {
- for(Iterator<Pool> itr = PoolManager.getAll().values().iterator(); itr.hasNext(); ) {
- Pool nextPool = itr.next();
- if(nextPool instanceof BridgePoolImpl) {
- BridgePoolImpl bridgePool = (BridgePoolImpl) nextPool;
- bridgePool.setKeepAlive(keepAlive);
- }
- }
- }
- /**
* Destroys all created pool in this manager.
*/
public void close(boolean keepAlive) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2eb4e175/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java
index 7d96986..08a6bad 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionEventImpl.java
@@ -282,7 +282,7 @@ public class RegionEventImpl
}
public ClientProxyMembershipID getContext() {
- // regular region events do not have bridge context - see BridgeRegionEventImpl
+ // regular region events do not have a context - see ClientRegionEventImpl
return null;
}