You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/19 20:28:13 UTC
[1/2] geode git commit: 2632: refactor code to use InternalCache
Repository: geode
Updated Branches:
refs/heads/feature/GEODE-2632-4 [created] 08106d330
2632: refactor code to use InternalCache
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/c44cc067
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/c44cc067
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/c44cc067
Branch: refs/heads/feature/GEODE-2632-4
Commit: c44cc0679eab2ca1cbb7cf83040421faa758e45a
Parents: 76c4983
Author: Kirk Lund <kl...@apache.org>
Authored: Wed Apr 19 13:09:10 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed Apr 19 13:09:10 2017 -0700
----------------------------------------------------------------------
.../geode/internal/cache/GemFireCacheImpl.java | 5 +++
.../geode/internal/cache/InternalCache.java | 10 +++++
.../cache/wan/AbstractGatewaySender.java | 2 +-
.../internal/cache/xmlcache/CacheCreation.java | 13 ++++--
.../cache/wan/GatewayReceiverFactoryImpl.java | 21 +++------
.../internal/cache/wan/GatewayReceiverImpl.java | 13 +++---
.../wan/GatewaySenderEventRemoteDispatcher.java | 45 +++++++-------------
.../cache/wan/GatewaySenderFactoryImpl.java | 42 ++++++++----------
.../wan/parallel/ParallelGatewaySenderImpl.java | 3 +-
.../wan/serial/SerialGatewaySenderImpl.java | 3 +-
10 files changed, 73 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 56243e1..e4f6fa1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -4087,6 +4087,11 @@ public class GemFireCacheImpl
}
}
+ @Override
+ public boolean isGemFireCacheImpl() {
+ return true;
+ }
+
public void removeGatewaySender(GatewaySender sender) {
if (isClient()) {
throw new UnsupportedOperationException("operation is not supported on a client cache");
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 709308b..09988d0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -34,6 +34,8 @@ import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.client.internal.ClientMetadataService;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
@@ -180,4 +182,12 @@ public interface InternalCache extends Cache, Extensible<Cache> {
Set<RegionListener> getRegionListeners();
CacheConfig getCacheConfig();
+
+ void addGatewaySender(GatewaySender sender);
+
+ boolean isGemFireCacheImpl(); // TODO: eliminate this and remove deps on CacheCreation
+
+ void addGatewayReceiver(GatewayReceiver recv);
+
+ CacheServer addCacheServer(boolean isGatewayReceiver);
}
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index ab3b3cf..832391d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -292,7 +292,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
this.getSenderAdvisor().setIsPrimary(isPrimary);
}
- public Cache getCache() {
+ public InternalCache getCache() {
return this.cache;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index a5f0fc2..f38265d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -1031,14 +1031,19 @@ public class CacheCreation implements InternalCache {
return this.bridgeServers;
}
- public GatewaySender addGatewaySender(GatewaySender sender) {
+ public void addGatewaySender(GatewaySender sender) {
this.gatewaySenders.add(sender);
- return sender;
+ //return sender; TODO:KIRK: delete this line
}
- public GatewayReceiver addGatewayReceiver(GatewayReceiver receiver) {
+ @Override
+ public boolean isGemFireCacheImpl() {
+ return false;
+ }
+
+ public void addGatewayReceiver(GatewayReceiver receiver) {
this.gatewayReceivers.add(receiver);
- return receiver;
+// return receiver; TODO:KIRK: delete this line
}
public AsyncEventQueue addAsyncEventQueue(AsyncEventQueue asyncEventQueue) {
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
index eb46258..ed11b30 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
@@ -18,19 +18,16 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayReceiverFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.xmlcache.CacheCreation;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.xmlcache.GatewayReceiverCreation;
import org.apache.geode.internal.i18n.LocalizedStrings;
/**
- *
* @since GemFire 7.0
*/
public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
@@ -51,13 +48,9 @@ public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
private List<GatewayTransportFilter> filters = new ArrayList<GatewayTransportFilter>();
- private Cache cache;
+ private InternalCache cache;
- public GatewayReceiverFactoryImpl() {
-
- }
-
- public GatewayReceiverFactoryImpl(Cache cache) {
+ public GatewayReceiverFactoryImpl(InternalCache cache) {
this.cache = cache;
}
@@ -112,11 +105,11 @@ public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
"Please specify either start port a value which is less than end port.");
}
GatewayReceiver recv = null;
- if (this.cache instanceof GemFireCacheImpl) {
+ if (this.cache.isGemFireCacheImpl()) {
recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort, this.timeBetPings,
this.socketBuffSize, this.bindAdd, this.filters, this.hostnameForSenders,
this.manualStart);
- ((GemFireCacheImpl) cache).addGatewayReceiver(recv);
+ this.cache.addGatewayReceiver(recv);
InternalDistributedSystem system =
(InternalDistributedSystem) this.cache.getDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv);
@@ -130,11 +123,11 @@ public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
ioe);
}
}
- } else if (this.cache instanceof CacheCreation) {
+ } else {
recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort,
this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
this.hostnameForSenders, this.manualStart);
- ((CacheCreation) cache).addGatewayReceiver(recv);
+ this.cache.addGatewayReceiver(recv);
}
return recv;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
index d953d7f..1e37a00 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
@@ -23,16 +23,15 @@ import java.util.List;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.wan.GatewayReceiver;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.cache.CacheServerImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -65,12 +64,12 @@ public class GatewayReceiverImpl implements GatewayReceiver {
private CacheServer receiver;
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
- public GatewayReceiverImpl(Cache cache, int startPort, int endPort, int timeBetPings,
- int buffSize, String bindAdd, List<GatewayTransportFilter> filters, String hostnameForSenders,
- boolean manualStart) {
- this.cache = (GemFireCacheImpl) cache;
+ GatewayReceiverImpl(InternalCache cache, int startPort, int endPort, int timeBetPings,
+ int buffSize, String bindAdd, List<GatewayTransportFilter> filters, String hostnameForSenders,
+ boolean manualStart) {
+ this.cache = cache;
/*
* If user has set hostNameForSenders then it should take precedence over bindAddress. If user
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 8da5613..c5c8cac 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.internal.cache.wan;
-
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
@@ -22,6 +21,7 @@ import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.geode.GemFireIOException;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException;
import org.apache.logging.log4j.Logger;
@@ -34,7 +34,6 @@ import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
@@ -45,7 +44,6 @@ import org.apache.geode.cache.client.internal.SenderProxy;
/**
* @since GemFire 7.0
- *
*/
public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDispatcher {
@@ -55,7 +53,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
private volatile Connection connection;
- private final Set<String> notFoundRegions = new HashSet<String>();
+ private final Set<String> notFoundRegions = new HashSet<>();
private final Object notFoundRegionsSync = new Object();
@@ -73,7 +71,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
this.processor = eventProcessor;
this.sender = eventProcessor.getSender();
- // this.ackReaderThread = new AckReaderThread(sender);
try {
initializeConnection();
} catch (GatewaySenderException e) {
@@ -83,7 +80,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
}
}
- protected GatewayAck readAcknowledgement() {
+ private GatewayAck readAcknowledgement() {
SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
GatewayAck ack = null;
Exception ex;
@@ -197,7 +194,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
}
private boolean _dispatchBatch(List events, boolean isRetry) {
- Exception ex = null;
+ Exception ex;
int currentBatchId = this.processor.getBatchId();
connection = getConnection(true);
int batchIdForThisConnection = this.processor.getBatchId();
@@ -296,8 +293,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
* Acquires or adds a new <code>Connection</code> to the corresponding <code>Gateway</code>
*
* @return the <code>Connection</code>
- *
- * @throws GatewaySenderException
*/
public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException {
if (this.processor.isStopped()) {
@@ -361,8 +356,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
/**
* Initializes the <code>Connection</code>.
- *
- * @throws GatewaySenderException
*/
private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
this.connectionLifeCycleLock.writeLock().lock();
@@ -468,7 +461,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
}
if (this.failedConnectCount > 0) {
Object[] logArgs = new Object[] {this.processor.getSender().getId(), con,
- Integer.valueOf(this.failedConnectCount)};
+ this.failedConnectCount};
logger.info(LocalizedMessage.create(
LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS,
logArgs));
@@ -490,7 +483,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
}
}
- protected boolean logConnectionFailure() {
+ private boolean logConnectionFailure() {
// always log the first failure
if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
return true;
@@ -533,39 +526,39 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
/**
* @return the batchId
*/
- public int getBatchId() {
+ int getBatchId() {
return batchId;
}
- public BatchException70 getBatchException() {
+ BatchException70 getBatchException() {
return this.be;
}
}
class AckReaderThread extends Thread {
- private Object runningStateLock = new Object();
+ private final Object runningStateLock = new Object();
/**
* boolean to make a shutdown request
*/
private volatile boolean shutdown = false;
- private final GemFireCacheImpl cache;
+ private final InternalCache cache;
private volatile boolean ackReaderThreadRunning = false;
- public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
+ AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
this(sender, processor.getName());
}
- public AckReaderThread(GatewaySender sender, String name) {
+ AckReaderThread(GatewaySender sender, String name) {
super("AckReaderThread for : " + name);
this.setDaemon(true);
- this.cache = (GemFireCacheImpl) ((AbstractGatewaySender) sender).getCache();
+ this.cache = ((AbstractGatewaySender) sender).getCache();
}
- public void waitForRunningAckReaderThreadRunningState() {
+ void waitForRunningAckReaderThreadRunningState() {
synchronized (runningStateLock) {
while (!this.ackReaderThreadRunning) {
try {
@@ -671,14 +664,9 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
}
ackReaderThreadRunning = false;
}
-
}
- /**
- * @param exception
- *
- */
- protected void logBatchExceptions(BatchException70 exception) {
+ void logBatchExceptions(BatchException70 exception) {
try {
for (BatchException70 be : exception.getExceptions()) {
boolean logWarning = true;
@@ -785,7 +773,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
} catch (ConnectionDestroyedException e) {
logger.info("AckReader shutting down and connection already destroyed");
}
-
}
}
@@ -805,6 +792,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
return connection != null && !connection.isDestroyed();
}
+ @Override
public void stop() {
stopAckReaderThread();
if (this.processor.isStopped()) {
@@ -812,4 +800,3 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
}
}
}
-
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
index 2c7925b..9111d9f 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
@@ -18,7 +18,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
-import org.apache.geode.cache.Cache;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
import org.apache.geode.cache.wan.GatewayEventFilter;
@@ -28,11 +27,9 @@ import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderImpl;
-import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -197,7 +194,7 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
}
// Verify socket read timeout if a proper logger is available
- if (this.cache instanceof GemFireCacheImpl) {
+ if (this.cache.isGemFireCacheImpl()) {
// If socket read timeout is less than the minimum, log a warning.
// Ideally, this should throw a GatewaySenderException, but wan dunit tests
// were failing, and we were running out of time to change them.
@@ -221,27 +218,22 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
}
if (this.attrs.isParallel()) {
- // if(this.attrs.getDispatcherThreads() != 1){
- // throw new GatewaySenderException(
- // LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_DISPATCHER_THREADS_OTHER_THAN_1
- // .toLocalizedString(id));
- // }
if ((this.attrs.getOrderPolicy() != null)
&& this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
throw new GatewaySenderException(
LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
.toLocalizedString(id, this.attrs.getOrderPolicy()));
}
- if (this.cache instanceof GemFireCacheImpl) {
+ if (this.cache.isGemFireCacheImpl()) {
sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
- ((GemFireCacheImpl) this.cache).addGatewaySender(sender);
+ this.cache.addGatewaySender(sender);
if (!this.attrs.isManualStart()) {
sender.start();
}
- } else if (this.cache instanceof CacheCreation) {
+ } else {
sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
- ((CacheCreation) this.cache).addGatewaySender(sender);
+ this.cache.addGatewaySender(sender);
}
} else {
if (this.attrs.getAsyncEventListeners().size() > 0) {
@@ -252,16 +244,16 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
}
- if (this.cache instanceof GemFireCacheImpl) {
+ if (this.cache.isGemFireCacheImpl()) {
sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
- ((GemFireCacheImpl) this.cache).addGatewaySender(sender);
+ this.cache.addGatewaySender(sender);
if (!this.attrs.isManualStart()) {
sender.start();
}
- } else if (this.cache instanceof CacheCreation) {
+ } else {
sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
- ((CacheCreation) this.cache).addGatewaySender(sender);
+ this.cache.addGatewaySender(sender);
}
}
return sender;
@@ -285,29 +277,29 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
.toLocalizedString(id, this.attrs.getOrderPolicy()));
}
- if (this.cache instanceof GemFireCacheImpl) {
+ if (this.cache.isGemFireCacheImpl()) {
sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
- ((GemFireCacheImpl) this.cache).addGatewaySender(sender);
+ this.cache.addGatewaySender(sender);
if (!this.attrs.isManualStart()) {
sender.start();
}
- } else if (this.cache instanceof CacheCreation) {
+ } else {
sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
- ((CacheCreation) this.cache).addGatewaySender(sender);
+ this.cache.addGatewaySender(sender);
}
} else {
if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
}
- if (this.cache instanceof GemFireCacheImpl) {
+ if (this.cache.isGemFireCacheImpl()) {
sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
- ((GemFireCacheImpl) this.cache).addGatewaySender(sender);
+ this.cache.addGatewaySender(sender);
if (!this.attrs.isManualStart()) {
sender.start();
}
- } else if (this.cache instanceof CacheCreation) {
+ } else {
sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
- ((CacheCreation) this.cache).addGatewaySender(sender);
+ this.cache.addGatewaySender(sender);
}
}
return sender;
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index c2d4673..d52df3c 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -25,7 +25,6 @@ import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegionHelper;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
@@ -79,7 +78,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
// Only notify the type registry if this is a WAN gateway queue
if (!isAsyncEventQueue()) {
- ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+ getCache().getPdxRegistry().gatewaySenderStarted(this);
}
new UpdateAttributesProcessor(this).distribute(false);
http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index b300460..53b0eca 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -26,7 +26,6 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
@@ -93,7 +92,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
// Only notify the type registry if this is a WAN gateway queue
if (!isAsyncEventQueue()) {
- ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+ getCache().getPdxRegistry().gatewaySenderStarted(this);
}
new UpdateAttributesProcessor(this).distribute(false);
[2/2] geode git commit: 2632: fix up getCache synchronization in
AutoBalancer
Posted by kl...@apache.org.
2632: fix up getCache synchronization in AutoBalancer
Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/08106d33
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/08106d33
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/08106d33
Branch: refs/heads/feature/GEODE-2632-4
Commit: 08106d3305c8b3179e2e6a9055754a5452d5080f
Parents: c44cc06
Author: Kirk Lund <kl...@apache.org>
Authored: Wed Apr 19 13:26:36 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed Apr 19 13:26:36 2017 -0700
----------------------------------------------------------------------
.../apache/geode/cache/util/AutoBalancer.java | 21 ++++++++++----------
1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/geode/blob/08106d33/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java
----------------------------------------------------------------------
diff --git a/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java b/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java
index ab8b5b0..2965f7f 100644
--- a/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java
+++ b/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java
@@ -25,12 +25,13 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
-import org.apache.geode.annotations.Experimental;
import org.apache.logging.log4j.Logger;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.apache.geode.GemFireConfigException;
+import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.GemFireCache;
@@ -344,14 +345,14 @@ public class AutoBalancer implements Declarable {
static class GeodeCacheFacade implements CacheOperationFacade {
private final AtomicBoolean isLockAcquired = new AtomicBoolean(false);
- private InternalCache cache;
+ private final AtomicReference<InternalCache> cacheRef = new AtomicReference();
public GeodeCacheFacade() {
this(null);
}
public GeodeCacheFacade(InternalCache cache) {
- this.cache = cache;
+ this.cacheRef.set(cache);
}
@Override
@@ -443,16 +444,16 @@ public class AutoBalancer implements Declarable {
}
InternalCache getCache() {
- if (cache == null) {
+ if (cacheRef.get() == null) {
synchronized (this) {
- if (cache == null) {
- cache = GemFireCacheImpl.getInstance();
- if (cache == null) {
- throw new IllegalStateException("Missing cache instance.");
- }
- }
+ cacheRef.set(GemFireCacheImpl.getInstance());
}
}
+
+ InternalCache cache = cacheRef.get();
+ if (cache == null) {
+ throw new IllegalStateException("Missing cache instance.");
+ }
if (cache.isClosed()) {
throw new CacheClosedException();
}