You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2021/08/17 18:17:40 UTC

[geode] 08/18: GEODE-6588: Cleanup GatewaySenderAdvisor

This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit fe190e6b99fedbeadd92c73937aa57aa547a9ce0
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Thu May 20 15:09:56 2021 -0700

    GEODE-6588: Cleanup GatewaySenderAdvisor
---
 .../distributed/internal/locks/DLockService.java   |   2 +
 .../internal/cache/wan/GatewaySenderAdvisor.java   | 198 +++++++++------------
 2 files changed, 87 insertions(+), 113 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
index 3813c16..652245c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
@@ -2720,6 +2721,7 @@ public class DLockService extends DistributedLockService {
    *
    * @see org.apache.geode.distributed.DistributedLockService#create(String, DistributedSystem)
    */
+  @NotNull
   public static DistributedLockService create(String serviceName, InternalDistributedSystem ds,
       boolean distributed, boolean destroyOnDisconnect, boolean automateFreeResources)
       throws IllegalArgumentException, IllegalStateException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
index 401296c..bf50c63 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
@@ -41,7 +41,6 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.locks.DLockService;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.UpdateAttributesProcessor;
@@ -65,12 +64,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
 
   private Thread lockObtainingThread;
 
-  private AbstractGatewaySender sender;
+  private final AbstractGatewaySender sender;
 
   private GatewaySenderAdvisor(DistributionAdvisee sender) {
     super(sender);
     this.sender = (AbstractGatewaySender) sender;
-    this.lockToken = getDLockServiceName() + "-token";
+    lockToken = getDLockServiceName() + "-token";
   }
 
   public static GatewaySenderAdvisor createGatewaySenderAdvisor(DistributionAdvisee sender) {
@@ -80,11 +79,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
   }
 
   public String getDLockServiceName() {
-    return getClass().getName() + "_" + this.sender.getId();
+    return getClass().getName() + "_" + sender.getId();
   }
 
   public Thread getLockObtainingThread() {
-    return this.lockObtainingThread;
+    return lockObtainingThread;
   }
 
   /** Instantiate new Sender profile for this member */
@@ -163,7 +162,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
       }
     }
 
-    List<String> senderEventFilterClassNames = new ArrayList<String>();
+    List<String> senderEventFilterClassNames = new ArrayList<>();
     for (org.apache.geode.cache.wan.GatewayEventFilter filter : sender.getGatewayEventFilters()) {
       senderEventFilterClassNames.add(filter.getClass().getName());
     }
@@ -183,7 +182,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
       }
     }
 
-    Set<String> senderTransportFilterClassNames = new LinkedHashSet<String>();
+    Set<String> senderTransportFilterClassNames = new LinkedHashSet<>();
     for (GatewayTransportFilter filter : sender.getGatewayTransportFilters()) {
       senderTransportFilterClassNames.add(filter.getClass().getName());
     }
@@ -205,7 +204,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
         }
       }
     }
-    List<String> senderEventListenerClassNames = new ArrayList<String>();
+    List<String> senderEventListenerClassNames = new ArrayList<>();
     for (AsyncEventListener listener : sender.getAsyncEventListeners()) {
       senderEventListenerClassNames.add(listener.getClass().getName());
     }
@@ -259,13 +258,13 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
             return;
           }
           // IF this sender is not primary
-          if (!this.sender.isPrimary()) {
+          if (!sender.isPrimary()) {
             if (!adviseEldestGatewaySender()) {// AND this is not the eldest
                                                // sender
               if (logger.isDebugEnabled()) {
                 logger.debug(
                     "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
-                    this.sender);
+                    sender);
               }
               return;
             }
@@ -273,7 +272,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
           }
         } else {
           if (sp.serverLocation != null) {
-            this.sender.setServerLocation(sp.serverLocation);
+            sender.setServerLocation(sp.serverLocation);
           }
         }
       }
@@ -293,12 +292,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
         if (advisePrimaryGatewaySender() != null) {
           return;
         }
-        if (!this.sender.isPrimary()) {// IF this sender is not primary
+        if (!sender.isPrimary()) {// IF this sender is not primary
           if (!adviseEldestGatewaySender()) {// AND this is not the eldest sender
             if (logger.isDebugEnabled()) {
               logger.debug(
                   "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
-                  this.sender);
+                  sender);
             }
             return;
           }
@@ -309,25 +308,24 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
   }
 
   public boolean isPrimary() {
-    return sender.isParallel() || this.isPrimary;
+    return sender.isParallel() || isPrimary;
   }
 
   public void initDLockService() {
-    InternalDistributedSystem ds = this.sender.getCache().getInternalDistributedSystem();
+    InternalDistributedSystem ds = sender.getCache().getInternalDistributedSystem();
     String dlsName = getDLockServiceName();
-    this.lockService = DistributedLockService.getServiceNamed(dlsName);
-    if (this.lockService == null) {
-      this.lockService = DLockService.create(dlsName, ds, true, true, true);
+    lockService = DistributedLockService.getServiceNamed(dlsName);
+    if (lockService == null) {
+      lockService = DLockService.create(dlsName, ds, true, true, true);
     }
-    Assert.assertTrue(this.lockService != null);
     if (logger.isDebugEnabled()) {
-      logger.debug("{}: Obtained DistributedLockService: {}", this, this.lockService);
+      logger.debug("{}: Obtained DistributedLockService: {}", this, lockService);
     }
   }
 
   public boolean volunteerForPrimary() {
     if (logger.isDebugEnabled()) {
-      logger.debug("Sender : {} is volunteering for Primary ", this.sender.getId());
+      logger.debug("Sender : {} is volunteering for Primary ", sender.getId());
     }
 
     if (advisePrimaryGatewaySender() == null) {
@@ -335,16 +333,16 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
         if (logger.isDebugEnabled()) {
           logger.debug(
               "Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
-              this.sender);
+              sender);
         }
         return false;
       }
       if (logger.isDebugEnabled()) {
         logger.debug("Sender : {} no Primary available. So going to acquire distributed lock",
-            this.sender);
+            sender);
       }
-      this.lockService.lock(this.lockToken, 10000, -1);
-      return this.lockService.isHeldByCurrentThread(this.lockToken);
+      lockService.lock(lockToken, 10000, -1);
+      return lockService.isHeldByCurrentThread(lockToken);
     }
     return false;
   }
@@ -357,11 +355,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
    * @return boolean true if this eldest sender; false otherwise
    */
   private boolean adviseEldestGatewaySender() {
-    Profile[] snapshot = this.profiles;
+    Profile[] snapshot = profiles;
 
     // sender with minimum startTime is eldest. Find out the minimum start time
     // of remote senders.
-    TreeSet<Long> senderStartTimes = new TreeSet<Long>();
+    TreeSet<Long> senderStartTimes = new TreeSet<>();
     for (Profile profile : snapshot) {
       GatewaySenderProfile sp = (GatewaySenderProfile) profile;
       if (!sp.isParallel && sp.isRunning) {
@@ -375,96 +373,70 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
     // it should give up
     // and allow existing running senders to volunteer
     return (senderStartTimes.isEmpty())
-        || (this.sender.isRunning() && (this.sender.startTime <= senderStartTimes.first()));
-  }
-
-  private InternalDistributedMember adviseEldestGatewaySenderNode() {
-    Profile[] snapshot = this.profiles;
-
-    // sender with minimum startTime is eldest. Find out the minimum start time
-    // of remote senders.
-    InternalDistributedMember node = null;
-    GatewaySenderProfile eldestProfile = null;
-    for (Profile profile : snapshot) {
-      GatewaySenderProfile sp = (GatewaySenderProfile) profile;
-      if (!sp.isParallel && sp.isRunning) {
-        if (eldestProfile == null) {
-          eldestProfile = sp;
-        }
-        if (sp.startTime < eldestProfile.startTime) {
-          eldestProfile = sp;
-        }
-      }
-    }
-
-    if (eldestProfile != null) {
-      node = eldestProfile.getDistributedMember();
-    }
-    return node;
+        || (sender.isRunning() && (sender.startTime <= senderStartTimes.first()));
   }
 
   public void makePrimary() {
-    logger.info("{} : Starting as primary", this.sender);
-    AbstractGatewaySenderEventProcessor eventProcessor = this.sender.getEventProcessor();
+    logger.info("{} : Starting as primary", sender);
+    AbstractGatewaySenderEventProcessor eventProcessor = sender.getEventProcessor();
     if (eventProcessor != null) {
       eventProcessor.removeCacheListener();
     }
 
-    logger.info("{} : Becoming primary gateway sender", this.sender);
+    logger.info("{} : Becoming primary gateway sender", sender);
     notifyAndBecomePrimary();
-    new UpdateAttributesProcessor(this.sender).distribute(false);
+    new UpdateAttributesProcessor(sender).distribute(false);
   }
 
   public void notifyAndBecomePrimary() {
-    synchronized (this.primaryLock) {
+    synchronized (primaryLock) {
       setIsPrimary(true);
       notifyPrimaryLock();
     }
   }
 
   public void notifyPrimaryLock() {
-    synchronized (this.primaryLock) {
-      this.primaryLock.notifyAll();
+    synchronized (primaryLock) {
+      primaryLock.notifyAll();
     }
   }
 
   public void makeSecondary() {
     if (logger.isDebugEnabled()) {
       logger.debug("{}: Did not obtain the lock on {}. Starting as secondary gateway sender.",
-          this.sender, this.lockToken);
+          sender, lockToken);
     }
 
     // Set primary flag to false
     logger.info(
         "{} starting as secondary because primary gateway sender is available on member :{}",
-        new Object[] {this.sender.getId(), advisePrimaryGatewaySender()});
-    this.isPrimary = false;
-    new UpdateAttributesProcessor(this.sender).distribute(false);
+        new Object[] {sender.getId(), advisePrimaryGatewaySender()});
+    isPrimary = false;
+    new UpdateAttributesProcessor(sender).distribute(false);
   }
 
   public void launchLockObtainingVolunteerThread() {
     String threadName = "Gateway Sender Primary Lock Acquisition Thread Volunteer";
-    this.lockObtainingThread = new LoggingThread(threadName, () -> {
-      GatewaySenderAdvisor.this.sender.getLifeCycleLock().readLock().lock();
+    lockObtainingThread = new LoggingThread(threadName, () -> {
+      sender.getLifeCycleLock().readLock().lock();
       try {
         // Attempt to obtain the lock
-        if (!(GatewaySenderAdvisor.this.sender.isRunning())) {
+        if (!(sender.isRunning())) {
           return;
         }
         if (logger.isDebugEnabled()) {
-          logger.debug("{}: Obtaining the lock on {}", this, GatewaySenderAdvisor.this.lockToken);
+          logger.debug("{}: Obtaining the lock on {}", this, lockToken);
         }
 
         if (volunteerForPrimary()) {
           if (logger.isDebugEnabled()) {
             logger.debug("{}: Obtained the lock on {}", this,
-                GatewaySenderAdvisor.this.lockToken);
+                lockToken);
           }
-          logger.info("{} is becoming primary gateway Sender.",
-              GatewaySenderAdvisor.this);
+          logger.info("{} is becoming primary gateway Sender.", this);
 
           // As soon as the lock is obtained, set primary
-          GatewaySenderAdvisor.this.makePrimary();
+          makePrimary();
         }
       } catch (CancelException e) {
         // no action necessary
@@ -472,14 +444,14 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
         if (!sender.getStopper().isCancelInProgress()) {
           logger.fatal(String.format(
               "%s: The thread to obtain the failover lock was interrupted. This gateway sender will never become the primary.",
-              GatewaySenderAdvisor.this),
+              this),
               e);
         }
       } finally {
-        GatewaySenderAdvisor.this.sender.getLifeCycleLock().readLock().unlock();
+        sender.getLifeCycleLock().readLock().unlock();
       }
     });
-    this.lockObtainingThread.start();
+    lockObtainingThread.start();
   }
 
   public void waitToBecomePrimary(AbstractGatewaySenderEventProcessor callingProcessor)
@@ -487,10 +459,10 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
     if (isPrimary()) {
       return;
     }
-    synchronized (this.primaryLock) {
-      logger.info("{} : Waiting to become primary gateway", this.sender.getId());
+    synchronized (primaryLock) {
+      logger.info("{} : Waiting to become primary gateway", sender.getId());
       while (!isPrimary()) {
-        this.primaryLock.wait(1000);
+        primaryLock.wait(1000);
         if (sender.getEventProcessor() != null && callingProcessor.isStopped()) {
           logger.info("The event processor is stopped, not to wait for being primary any more.");
           return;
@@ -527,11 +499,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
 
     public boolean manualStart;
 
-    public ArrayList<String> eventFiltersClassNames = new ArrayList<String>();
+    public ArrayList<String> eventFiltersClassNames = new ArrayList<>();
 
-    public ArrayList<String> transFiltersClassNames = new ArrayList<String>();
+    public ArrayList<String> transFiltersClassNames = new ArrayList<>();
 
-    public ArrayList<String> senderEventListenerClassNames = new ArrayList<String>();
+    public ArrayList<String> senderEventListenerClassNames = new ArrayList<>();
 
     public boolean isDiskSynchronous;
 
@@ -553,47 +525,47 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
     public void fromData(DataInput in,
         DeserializationContext context) throws IOException, ClassNotFoundException {
       fromDataPre_GEODE_1_14_0_0(in, context);
-      this.enforceThreadsConnectSameReceiver = in.readBoolean();
+      enforceThreadsConnectSameReceiver = in.readBoolean();
     }
 
     public void fromDataPre_GEODE_1_14_0_0(DataInput in,
         DeserializationContext context) throws IOException, ClassNotFoundException {
       super.fromData(in, context);
-      this.Id = DataSerializer.readString(in);
-      this.startTime = in.readLong();
-      this.remoteDSId = in.readInt();
-      this.isRunning = in.readBoolean();
-      this.isPrimary = in.readBoolean();
-      this.isParallel = in.readBoolean();
-      this.isBatchConflationEnabled = in.readBoolean();
-      this.isPersistenceEnabled = in.readBoolean();
-      this.alertThreshold = in.readInt();
-      this.manualStart = in.readBoolean();
-      this.eventFiltersClassNames = DataSerializer.readArrayList(in);
-      this.transFiltersClassNames = DataSerializer.readArrayList(in);
-      this.senderEventListenerClassNames = DataSerializer.readArrayList(in);
-      this.isDiskSynchronous = in.readBoolean();
-      this.dispatcherThreads = in.readInt();
+      Id = DataSerializer.readString(in);
+      startTime = in.readLong();
+      remoteDSId = in.readInt();
+      isRunning = in.readBoolean();
+      isPrimary = in.readBoolean();
+      isParallel = in.readBoolean();
+      isBatchConflationEnabled = in.readBoolean();
+      isPersistenceEnabled = in.readBoolean();
+      alertThreshold = in.readInt();
+      manualStart = in.readBoolean();
+      eventFiltersClassNames = DataSerializer.readArrayList(in);
+      transFiltersClassNames = DataSerializer.readArrayList(in);
+      senderEventListenerClassNames = DataSerializer.readArrayList(in);
+      isDiskSynchronous = in.readBoolean();
+      dispatcherThreads = in.readInt();
       if (StaticSerialization.getVersionForDataStream(in).isOlderThan(KnownVersion.GFE_90)) {
         Gateway.OrderPolicy oldOrderPolicy = DataSerializer.readObject(in);
         if (oldOrderPolicy != null) {
           if (oldOrderPolicy.name().equals(OrderPolicy.KEY.name())) {
-            this.orderPolicy = OrderPolicy.KEY;
+            orderPolicy = OrderPolicy.KEY;
           } else if (oldOrderPolicy.name().equals(OrderPolicy.THREAD.name())) {
-            this.orderPolicy = OrderPolicy.THREAD;
+            orderPolicy = OrderPolicy.THREAD;
           } else {
-            this.orderPolicy = OrderPolicy.PARTITION;
+            orderPolicy = OrderPolicy.PARTITION;
           }
         } else {
-          this.orderPolicy = null;
+          orderPolicy = null;
         }
       } else {
-        this.orderPolicy = DataSerializer.readObject(in);
+        orderPolicy = DataSerializer.readObject(in);
       }
       boolean serverLocationFound = DataSerializer.readPrimitiveBoolean(in);
       if (serverLocationFound) {
-        this.serverLocation = new ServerLocation();
-        InternalDataSerializer.invokeFromData(this.serverLocation, in);
+        serverLocation = new ServerLocation();
+        InternalDataSerializer.invokeFromData(serverLocation, in);
       }
     }
 
@@ -623,8 +595,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
       out.writeBoolean(isDiskSynchronous);
       out.writeInt(dispatcherThreads);
       if (StaticSerialization.getVersionForDataStream(out).isOlderThan(KnownVersion.GFE_90)
-          && this.orderPolicy != null) {
-        String orderPolicyName = this.orderPolicy.name();
+          && orderPolicy != null) {
+        String orderPolicyName = orderPolicy.name();
         if (orderPolicyName.equals(Gateway.OrderPolicy.KEY.name())) {
           DataSerializer.writeObject(Gateway.OrderPolicy.KEY, out);
         } else if (orderPolicyName.equals(Gateway.OrderPolicy.THREAD.name())) {
@@ -635,7 +607,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
       } else {
         DataSerializer.writeObject(orderPolicy, out);
       }
-      boolean serverLocationFound = (this.serverLocation != null);
+      boolean serverLocationFound = (serverLocation != null);
       DataSerializer.writePrimitiveBoolean(serverLocationFound, out);
       if (serverLocationFound) {
         InternalDataSerializer.invokeToData(serverLocation, out);
@@ -669,15 +641,15 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
     @Override
     public void fillInToString(StringBuilder sb) {
       super.fillInToString(sb);
-      sb.append("; id=" + this.Id);
-      sb.append("; remoteDSName=" + this.remoteDSId);
-      sb.append("; isRunning=" + this.isRunning);
-      sb.append("; isPrimary=" + this.isPrimary);
+      sb.append("; id=").append(Id);
+      sb.append("; remoteDSName=").append(remoteDSId);
+      sb.append("; isRunning=").append(isRunning);
+      sb.append("; isPrimary=").append(isPrimary);
     }
   }
 
   public InternalDistributedMember advisePrimaryGatewaySender() {
-    Profile[] snapshot = this.profiles;
+    Profile[] snapshot = profiles;
     for (Profile profile : snapshot) {
       GatewaySenderProfile sp = (GatewaySenderProfile) profile;
       if (!sp.isParallel && sp.isPrimary) {
@@ -693,7 +665,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
 
   @Override
   public void close() {
-    new UpdateAttributesProcessor(this.getAdvisee(), true).distribute(false);
+    new UpdateAttributesProcessor(getAdvisee(), true).distribute(false);
     super.close();
   }
 }