You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/04/19 20:28:13 UTC

[1/2] geode git commit: 2632: refactor code to use InternalCache

Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2632-4 [created] 08106d330


2632: refactor code to use InternalCache


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/c44cc067
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/c44cc067
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/c44cc067

Branch: refs/heads/feature/GEODE-2632-4
Commit: c44cc0679eab2ca1cbb7cf83040421faa758e45a
Parents: 76c4983
Author: Kirk Lund <kl...@apache.org>
Authored: Wed Apr 19 13:09:10 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed Apr 19 13:09:10 2017 -0700

----------------------------------------------------------------------
 .../geode/internal/cache/GemFireCacheImpl.java  |  5 +++
 .../geode/internal/cache/InternalCache.java     | 10 +++++
 .../cache/wan/AbstractGatewaySender.java        |  2 +-
 .../internal/cache/xmlcache/CacheCreation.java  | 13 ++++--
 .../cache/wan/GatewayReceiverFactoryImpl.java   | 21 +++------
 .../internal/cache/wan/GatewayReceiverImpl.java | 13 +++---
 .../wan/GatewaySenderEventRemoteDispatcher.java | 45 +++++++-------------
 .../cache/wan/GatewaySenderFactoryImpl.java     | 42 ++++++++----------
 .../wan/parallel/ParallelGatewaySenderImpl.java |  3 +-
 .../wan/serial/SerialGatewaySenderImpl.java     |  3 +-
 10 files changed, 73 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index 56243e1..e4f6fa1 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -4087,6 +4087,11 @@ public class GemFireCacheImpl
     }
   }
 
+  @Override
+  public boolean isGemFireCacheImpl() {
+    return true;
+  }
+
   public void removeGatewaySender(GatewaySender sender) {
     if (isClient()) {
       throw new UnsupportedOperationException("operation is not supported on a client cache");

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index 709308b..09988d0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -34,6 +34,8 @@ import org.apache.geode.cache.TimeoutException;
 import org.apache.geode.cache.client.internal.ClientMetadataService;
 import org.apache.geode.cache.query.QueryService;
 import org.apache.geode.cache.query.internal.cq.CqService;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.wan.GatewayReceiver;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.DistributedMember;
@@ -180,4 +182,12 @@ public interface InternalCache extends Cache, Extensible<Cache> {
   Set<RegionListener> getRegionListeners();
 
   CacheConfig getCacheConfig();
+
+  void addGatewaySender(GatewaySender sender);
+
+  boolean isGemFireCacheImpl(); // TODO: eliminate this and remove deps on CacheCreation
+
+  void addGatewayReceiver(GatewayReceiver recv);
+
+  CacheServer addCacheServer(boolean isGatewayReceiver);
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index ab3b3cf..832391d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -292,7 +292,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
     this.getSenderAdvisor().setIsPrimary(isPrimary);
   }
 
-  public Cache getCache() {
+  public InternalCache getCache() {
     return this.cache;
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index a5f0fc2..f38265d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -1031,14 +1031,19 @@ public class CacheCreation implements InternalCache {
     return this.bridgeServers;
   }
 
-  public GatewaySender addGatewaySender(GatewaySender sender) {
+  public void addGatewaySender(GatewaySender sender) {
     this.gatewaySenders.add(sender);
-    return sender;
+    //return sender; TODO:KIRK: delete this line
   }
 
-  public GatewayReceiver addGatewayReceiver(GatewayReceiver receiver) {
+  @Override
+  public boolean isGemFireCacheImpl() {
+    return false;
+  }
+
+  public void addGatewayReceiver(GatewayReceiver receiver) {
     this.gatewayReceivers.add(receiver);
-    return receiver;
+//    return receiver; TODO:KIRK: delete this line
   }
 
   public AsyncEventQueue addAsyncEventQueue(AsyncEventQueue asyncEventQueue) {

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
index eb46258..ed11b30 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverFactoryImpl.java
@@ -18,19 +18,16 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.wan.GatewayReceiver;
 import org.apache.geode.cache.wan.GatewayReceiverFactory;
 import org.apache.geode.cache.wan.GatewayTransportFilter;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ResourceEvent;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.cache.xmlcache.CacheCreation;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.xmlcache.GatewayReceiverCreation;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 /**
- * 
  * @since GemFire 7.0
  */
 public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
@@ -51,13 +48,9 @@ public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
 
   private List<GatewayTransportFilter> filters = new ArrayList<GatewayTransportFilter>();
 
-  private Cache cache;
+  private InternalCache cache;
 
-  public GatewayReceiverFactoryImpl() {
-
-  }
-
-  public GatewayReceiverFactoryImpl(Cache cache) {
+  public GatewayReceiverFactoryImpl(InternalCache cache) {
     this.cache = cache;
   }
 
@@ -112,11 +105,11 @@ public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
           "Please specify either start port a value which is less than end port.");
     }
     GatewayReceiver recv = null;
-    if (this.cache instanceof GemFireCacheImpl) {
+    if (this.cache.isGemFireCacheImpl()) {
       recv = new GatewayReceiverImpl(this.cache, this.startPort, this.endPort, this.timeBetPings,
           this.socketBuffSize, this.bindAdd, this.filters, this.hostnameForSenders,
           this.manualStart);
-      ((GemFireCacheImpl) cache).addGatewayReceiver(recv);
+      this.cache.addGatewayReceiver(recv);
       InternalDistributedSystem system =
           (InternalDistributedSystem) this.cache.getDistributedSystem();
       system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_CREATE, recv);
@@ -130,11 +123,11 @@ public class GatewayReceiverFactoryImpl implements GatewayReceiverFactory {
               ioe);
         }
       }
-    } else if (this.cache instanceof CacheCreation) {
+    } else {
       recv = new GatewayReceiverCreation(this.cache, this.startPort, this.endPort,
           this.timeBetPings, this.socketBuffSize, this.bindAdd, this.filters,
           this.hostnameForSenders, this.manualStart);
-      ((CacheCreation) cache).addGatewayReceiver(recv);
+      this.cache.addGatewayReceiver(recv);
     }
     return recv;
   }

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
index d953d7f..1e37a00 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
@@ -23,16 +23,15 @@ import java.util.List;
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.wan.GatewayReceiver;
 import org.apache.geode.cache.wan.GatewayTransportFilter;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ResourceEvent;
 import org.apache.geode.internal.AvailablePort;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.cache.CacheServerImpl;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
@@ -65,12 +64,12 @@ public class GatewayReceiverImpl implements GatewayReceiver {
 
   private CacheServer receiver;
 
-  private final GemFireCacheImpl cache;
+  private final InternalCache cache;
 
-  public GatewayReceiverImpl(Cache cache, int startPort, int endPort, int timeBetPings,
-      int buffSize, String bindAdd, List<GatewayTransportFilter> filters, String hostnameForSenders,
-      boolean manualStart) {
-    this.cache = (GemFireCacheImpl) cache;
+  GatewayReceiverImpl(InternalCache cache, int startPort, int endPort, int timeBetPings,
+                      int buffSize, String bindAdd, List<GatewayTransportFilter> filters, String hostnameForSenders,
+                      boolean manualStart) {
+    this.cache = cache;
 
     /*
      * If user has set hostNameForSenders then it should take precedence over bindAddress. If user

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 8da5613..c5c8cac 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -14,7 +14,6 @@
  */
 package org.apache.geode.internal.cache.wan;
 
-
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
@@ -22,6 +21,7 @@ import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.geode.GemFireIOException;
+import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.sockets.MessageTooLargeException;
 import org.apache.logging.log4j.Logger;
 
@@ -34,7 +34,6 @@ import org.apache.geode.cache.client.internal.Connection;
 import org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
 import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.internal.ServerLocation;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.UpdateAttributesProcessor;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
@@ -45,7 +44,6 @@ import org.apache.geode.cache.client.internal.SenderProxy;
 
 /**
  * @since GemFire 7.0
- *
  */
 public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDispatcher {
 
@@ -55,7 +53,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
 
   private volatile Connection connection;
 
-  private final Set<String> notFoundRegions = new HashSet<String>();
+  private final Set<String> notFoundRegions = new HashSet<>();
 
   private final Object notFoundRegionsSync = new Object();
 
@@ -73,7 +71,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
   public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor eventProcessor) {
     this.processor = eventProcessor;
     this.sender = eventProcessor.getSender();
-    // this.ackReaderThread = new AckReaderThread(sender);
     try {
       initializeConnection();
     } catch (GatewaySenderException e) {
@@ -83,7 +80,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
     }
   }
 
-  protected GatewayAck readAcknowledgement() {
+  private GatewayAck readAcknowledgement() {
     SenderProxy sp = new SenderProxy(this.processor.getSender().getProxy());
     GatewayAck ack = null;
     Exception ex;
@@ -197,7 +194,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
   }
 
   private boolean _dispatchBatch(List events, boolean isRetry) {
-    Exception ex = null;
+    Exception ex;
     int currentBatchId = this.processor.getBatchId();
     connection = getConnection(true);
     int batchIdForThisConnection = this.processor.getBatchId();
@@ -296,8 +293,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
    * Acquires or adds a new <code>Connection</code> to the corresponding <code>Gateway</code>
    *
    * @return the <code>Connection</code>
-   *
-   * @throws GatewaySenderException
    */
   public Connection getConnection(boolean startAckReaderThread) throws GatewaySenderException {
     if (this.processor.isStopped()) {
@@ -361,8 +356,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
 
   /**
    * Initializes the <code>Connection</code>.
-   *
-   * @throws GatewaySenderException
    */
   private void initializeConnection() throws GatewaySenderException, GemFireSecurityException {
     this.connectionLifeCycleLock.writeLock().lock();
@@ -468,7 +461,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       }
       if (this.failedConnectCount > 0) {
         Object[] logArgs = new Object[] {this.processor.getSender().getId(), con,
-            Integer.valueOf(this.failedConnectCount)};
+            this.failedConnectCount};
         logger.info(LocalizedMessage.create(
             LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS,
             logArgs));
@@ -490,7 +483,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
     }
   }
 
-  protected boolean logConnectionFailure() {
+  private boolean logConnectionFailure() {
     // always log the first failure
     if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
       return true;
@@ -533,39 +526,39 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
     /**
      * @return the batchId
      */
-    public int getBatchId() {
+    int getBatchId() {
       return batchId;
     }
 
-    public BatchException70 getBatchException() {
+    BatchException70 getBatchException() {
       return this.be;
     }
   }
 
   class AckReaderThread extends Thread {
 
-    private Object runningStateLock = new Object();
+    private final Object runningStateLock = new Object();
 
     /**
      * boolean to make a shutdown request
      */
     private volatile boolean shutdown = false;
 
-    private final GemFireCacheImpl cache;
+    private final InternalCache cache;
 
     private volatile boolean ackReaderThreadRunning = false;
 
-    public AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
+    AckReaderThread(GatewaySender sender, AbstractGatewaySenderEventProcessor processor) {
       this(sender, processor.getName());
     }
 
-    public AckReaderThread(GatewaySender sender, String name) {
+    AckReaderThread(GatewaySender sender, String name) {
       super("AckReaderThread for : " + name);
       this.setDaemon(true);
-      this.cache = (GemFireCacheImpl) ((AbstractGatewaySender) sender).getCache();
+      this.cache = ((AbstractGatewaySender) sender).getCache();
     }
 
-    public void waitForRunningAckReaderThreadRunningState() {
+    void waitForRunningAckReaderThreadRunningState() {
       synchronized (runningStateLock) {
         while (!this.ackReaderThreadRunning) {
           try {
@@ -671,14 +664,9 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
         }
         ackReaderThreadRunning = false;
       }
-
     }
 
-    /**
-     * @param exception
-     * 
-     */
-    protected void logBatchExceptions(BatchException70 exception) {
+    void logBatchExceptions(BatchException70 exception) {
       try {
         for (BatchException70 be : exception.getExceptions()) {
           boolean logWarning = true;
@@ -785,7 +773,6 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
       } catch (ConnectionDestroyedException e) {
         logger.info("AckReader shutting down and connection already destroyed");
       }
-
     }
   }
 
@@ -805,6 +792,7 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
     return connection != null && !connection.isDestroyed();
   }
 
+  @Override
   public void stop() {
     stopAckReaderThread();
     if (this.processor.isStopped()) {
@@ -812,4 +800,3 @@ public class GatewaySenderEventRemoteDispatcher implements GatewaySenderEventDis
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
index 2c7925b..9111d9f 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
@@ -18,7 +18,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.logging.log4j.Logger;
 
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
 import org.apache.geode.cache.client.internal.LocatorDiscoveryCallback;
 import org.apache.geode.cache.wan.GatewayEventFilter;
@@ -28,11 +27,9 @@ import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
 import org.apache.geode.cache.wan.GatewaySenderFactory;
 import org.apache.geode.cache.wan.GatewayTransportFilter;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderImpl;
 import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderImpl;
-import org.apache.geode.internal.cache.xmlcache.CacheCreation;
 import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
 import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
 import org.apache.geode.internal.i18n.LocalizedStrings;
@@ -197,7 +194,7 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
     }
 
     // Verify socket read timeout if a proper logger is available
-    if (this.cache instanceof GemFireCacheImpl) {
+    if (this.cache.isGemFireCacheImpl()) {
       // If socket read timeout is less than the minimum, log a warning.
       // Ideally, this should throw a GatewaySenderException, but wan dunit tests
       // were failing, and we were running out of time to change them.
@@ -221,27 +218,22 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
     }
 
     if (this.attrs.isParallel()) {
-      // if(this.attrs.getDispatcherThreads() != 1){
-      // throw new GatewaySenderException(
-      // LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_DISPATCHER_THREADS_OTHER_THAN_1
-      // .toLocalizedString(id));
-      // }
       if ((this.attrs.getOrderPolicy() != null)
           && this.attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
         throw new GatewaySenderException(
             LocalizedStrings.GatewaySenderImpl_PARALLEL_GATEWAY_SENDER_0_CANNOT_BE_CREATED_WITH_ORDER_POLICY_1
                 .toLocalizedString(id, this.attrs.getOrderPolicy()));
       }
-      if (this.cache instanceof GemFireCacheImpl) {
+      if (this.cache.isGemFireCacheImpl()) {
         sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
-        ((GemFireCacheImpl) this.cache).addGatewaySender(sender);
+        this.cache.addGatewaySender(sender);
 
         if (!this.attrs.isManualStart()) {
           sender.start();
         }
-      } else if (this.cache instanceof CacheCreation) {
+      } else {
         sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
-        ((CacheCreation) this.cache).addGatewaySender(sender);
+        this.cache.addGatewaySender(sender);
       }
     } else {
       if (this.attrs.getAsyncEventListeners().size() > 0) {
@@ -252,16 +244,16 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
       if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
         this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
       }
-      if (this.cache instanceof GemFireCacheImpl) {
+      if (this.cache.isGemFireCacheImpl()) {
         sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
-        ((GemFireCacheImpl) this.cache).addGatewaySender(sender);
+        this.cache.addGatewaySender(sender);
 
         if (!this.attrs.isManualStart()) {
           sender.start();
         }
-      } else if (this.cache instanceof CacheCreation) {
+      } else {
         sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
-        ((CacheCreation) this.cache).addGatewaySender(sender);
+        this.cache.addGatewaySender(sender);
       }
     }
     return sender;
@@ -285,29 +277,29 @@ public class GatewaySenderFactoryImpl implements InternalGatewaySenderFactory {
                 .toLocalizedString(id, this.attrs.getOrderPolicy()));
       }
 
-      if (this.cache instanceof GemFireCacheImpl) {
+      if (this.cache.isGemFireCacheImpl()) {
         sender = new ParallelGatewaySenderImpl(this.cache, this.attrs);
-        ((GemFireCacheImpl) this.cache).addGatewaySender(sender);
+        this.cache.addGatewaySender(sender);
         if (!this.attrs.isManualStart()) {
           sender.start();
         }
-      } else if (this.cache instanceof CacheCreation) {
+      } else {
         sender = new ParallelGatewaySenderCreation(this.cache, this.attrs);
-        ((CacheCreation) this.cache).addGatewaySender(sender);
+        this.cache.addGatewaySender(sender);
       }
     } else {
       if (this.attrs.getOrderPolicy() == null && this.attrs.getDispatcherThreads() > 1) {
         this.attrs.policy = GatewaySender.DEFAULT_ORDER_POLICY;
       }
-      if (this.cache instanceof GemFireCacheImpl) {
+      if (this.cache.isGemFireCacheImpl()) {
         sender = new SerialGatewaySenderImpl(this.cache, this.attrs);
-        ((GemFireCacheImpl) this.cache).addGatewaySender(sender);
+        this.cache.addGatewaySender(sender);
         if (!this.attrs.isManualStart()) {
           sender.start();
         }
-      } else if (this.cache instanceof CacheCreation) {
+      } else {
         sender = new SerialGatewaySenderCreation(this.cache, this.attrs);
-        ((CacheCreation) this.cache).addGatewaySender(sender);
+        this.cache.addGatewaySender(sender);
       }
     }
     return sender;

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
index c2d4673..d52df3c 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderImpl.java
@@ -25,7 +25,6 @@ import org.apache.geode.distributed.internal.ResourceEvent;
 import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PartitionedRegionHelper;
 import org.apache.geode.internal.cache.UpdateAttributesProcessor;
@@ -79,7 +78,7 @@ public class ParallelGatewaySenderImpl extends AbstractRemoteGatewaySender {
 
       // Only notify the type registry if this is a WAN gateway queue
       if (!isAsyncEventQueue()) {
-        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+        getCache().getPdxRegistry().gatewaySenderStarted(this);
       }
       new UpdateAttributesProcessor(this).distribute(false);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/c44cc067/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
----------------------------------------------------------------------
diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index b300460..53b0eca 100644
--- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -26,7 +26,6 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ResourceEvent;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.UpdateAttributesProcessor;
@@ -93,7 +92,7 @@ public class SerialGatewaySenderImpl extends AbstractRemoteGatewaySender {
 
       // Only notify the type registry if this is a WAN gateway queue
       if (!isAsyncEventQueue()) {
-        ((GemFireCacheImpl) getCache()).getPdxRegistry().gatewaySenderStarted(this);
+        getCache().getPdxRegistry().gatewaySenderStarted(this);
       }
       new UpdateAttributesProcessor(this).distribute(false);
 


[2/2] geode git commit: 2632: fix up getCache synchronization in AutoBalancer

Posted by kl...@apache.org.
2632: fix up getCache synchronization in AutoBalancer


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/08106d33
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/08106d33
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/08106d33

Branch: refs/heads/feature/GEODE-2632-4
Commit: 08106d3305c8b3179e2e6a9055754a5452d5080f
Parents: c44cc06
Author: Kirk Lund <kl...@apache.org>
Authored: Wed Apr 19 13:26:36 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed Apr 19 13:26:36 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/cache/util/AutoBalancer.java   | 21 ++++++++++----------
 1 file changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/08106d33/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java
----------------------------------------------------------------------
diff --git a/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java b/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java
index ab8b5b0..2965f7f 100644
--- a/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java
+++ b/geode-rebalancer/src/main/java/org/apache/geode/cache/util/AutoBalancer.java
@@ -25,12 +25,13 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.geode.annotations.Experimental;
 import org.apache.logging.log4j.Logger;
 import org.springframework.scheduling.support.CronSequenceGenerator;
 
 import org.apache.geode.GemFireConfigException;
+import org.apache.geode.annotations.Experimental;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.Declarable;
 import org.apache.geode.cache.GemFireCache;
@@ -344,14 +345,14 @@ public class AutoBalancer implements Declarable {
   static class GeodeCacheFacade implements CacheOperationFacade {
     private final AtomicBoolean isLockAcquired = new AtomicBoolean(false);
 
-    private InternalCache cache;
+    private final AtomicReference<InternalCache> cacheRef = new AtomicReference();
 
     public GeodeCacheFacade() {
       this(null);
     }
 
     public GeodeCacheFacade(InternalCache cache) {
-      this.cache = cache;
+      this.cacheRef.set(cache);
     }
 
     @Override
@@ -443,16 +444,16 @@ public class AutoBalancer implements Declarable {
     }
 
     InternalCache getCache() {
-      if (cache == null) {
+      if (cacheRef.get() == null) {
         synchronized (this) {
-          if (cache == null) {
-            cache = GemFireCacheImpl.getInstance();
-            if (cache == null) {
-              throw new IllegalStateException("Missing cache instance.");
-            }
-          }
+          cacheRef.set(GemFireCacheImpl.getInstance());
         }
       }
+
+      InternalCache cache = cacheRef.get();
+      if (cache == null) {
+        throw new IllegalStateException("Missing cache instance.");
+      }
       if (cache.isClosed()) {
         throw new CacheClosedException();
       }