You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2021/08/17 18:17:40 UTC
[geode] 08/18: GEODE-6588: Cleanup GatewaySenderAdvisor
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit fe190e6b99fedbeadd92c73937aa57aa547a9ce0
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Thu May 20 15:09:56 2021 -0700
GEODE-6588: Cleanup GatewaySenderAdvisor
---
.../distributed/internal/locks/DLockService.java | 2 +
.../internal/cache/wan/GatewaySenderAdvisor.java | 198 +++++++++------------
2 files changed, 87 insertions(+), 113 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
index 3813c16..652245c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
@@ -2720,6 +2721,7 @@ public class DLockService extends DistributedLockService {
*
* @see org.apache.geode.distributed.DistributedLockService#create(String, DistributedSystem)
*/
+ @NotNull
public static DistributedLockService create(String serviceName, InternalDistributedSystem ds,
boolean distributed, boolean destroyOnDisconnect, boolean automateFreeResources)
throws IllegalArgumentException, IllegalStateException {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
index 401296c..bf50c63 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
@@ -41,7 +41,6 @@ import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
@@ -65,12 +64,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
private Thread lockObtainingThread;
- private AbstractGatewaySender sender;
+ private final AbstractGatewaySender sender;
private GatewaySenderAdvisor(DistributionAdvisee sender) {
super(sender);
this.sender = (AbstractGatewaySender) sender;
- this.lockToken = getDLockServiceName() + "-token";
+ lockToken = getDLockServiceName() + "-token";
}
public static GatewaySenderAdvisor createGatewaySenderAdvisor(DistributionAdvisee sender) {
@@ -80,11 +79,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
}
public String getDLockServiceName() {
- return getClass().getName() + "_" + this.sender.getId();
+ return getClass().getName() + "_" + sender.getId();
}
public Thread getLockObtainingThread() {
- return this.lockObtainingThread;
+ return lockObtainingThread;
}
/** Instantiate new Sender profile for this member */
@@ -163,7 +162,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
}
}
- List<String> senderEventFilterClassNames = new ArrayList<String>();
+ List<String> senderEventFilterClassNames = new ArrayList<>();
for (org.apache.geode.cache.wan.GatewayEventFilter filter : sender.getGatewayEventFilters()) {
senderEventFilterClassNames.add(filter.getClass().getName());
}
@@ -183,7 +182,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
}
}
- Set<String> senderTransportFilterClassNames = new LinkedHashSet<String>();
+ Set<String> senderTransportFilterClassNames = new LinkedHashSet<>();
for (GatewayTransportFilter filter : sender.getGatewayTransportFilters()) {
senderTransportFilterClassNames.add(filter.getClass().getName());
}
@@ -205,7 +204,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
}
}
}
- List<String> senderEventListenerClassNames = new ArrayList<String>();
+ List<String> senderEventListenerClassNames = new ArrayList<>();
for (AsyncEventListener listener : sender.getAsyncEventListeners()) {
senderEventListenerClassNames.add(listener.getClass().getName());
}
@@ -259,13 +258,13 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
return;
}
// IF this sender is not primary
- if (!this.sender.isPrimary()) {
+ if (!sender.isPrimary()) {
if (!adviseEldestGatewaySender()) {// AND this is not the eldest
// sender
if (logger.isDebugEnabled()) {
logger.debug(
"Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
- this.sender);
+ sender);
}
return;
}
@@ -273,7 +272,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
}
} else {
if (sp.serverLocation != null) {
- this.sender.setServerLocation(sp.serverLocation);
+ sender.setServerLocation(sp.serverLocation);
}
}
}
@@ -293,12 +292,12 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
if (advisePrimaryGatewaySender() != null) {
return;
}
- if (!this.sender.isPrimary()) {// IF this sender is not primary
+ if (!sender.isPrimary()) {// IF this sender is not primary
if (!adviseEldestGatewaySender()) {// AND this is not the eldest sender
if (logger.isDebugEnabled()) {
logger.debug(
"Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
- this.sender);
+ sender);
}
return;
}
@@ -309,25 +308,24 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
}
public boolean isPrimary() {
- return sender.isParallel() || this.isPrimary;
+ return sender.isParallel() || isPrimary;
}
public void initDLockService() {
- InternalDistributedSystem ds = this.sender.getCache().getInternalDistributedSystem();
+ InternalDistributedSystem ds = sender.getCache().getInternalDistributedSystem();
String dlsName = getDLockServiceName();
- this.lockService = DistributedLockService.getServiceNamed(dlsName);
- if (this.lockService == null) {
- this.lockService = DLockService.create(dlsName, ds, true, true, true);
+ lockService = DistributedLockService.getServiceNamed(dlsName);
+ if (lockService == null) {
+ lockService = DLockService.create(dlsName, ds, true, true, true);
}
- Assert.assertTrue(this.lockService != null);
if (logger.isDebugEnabled()) {
- logger.debug("{}: Obtained DistributedLockService: {}", this, this.lockService);
+ logger.debug("{}: Obtained DistributedLockService: {}", this, lockService);
}
}
public boolean volunteerForPrimary() {
if (logger.isDebugEnabled()) {
- logger.debug("Sender : {} is volunteering for Primary ", this.sender.getId());
+ logger.debug("Sender : {} is volunteering for Primary ", sender.getId());
}
if (advisePrimaryGatewaySender() == null) {
@@ -335,16 +333,16 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
if (logger.isDebugEnabled()) {
logger.debug(
"Sender {} is not the eldest in the system. Giving preference to eldest sender to become primary...",
- this.sender);
+ sender);
}
return false;
}
if (logger.isDebugEnabled()) {
logger.debug("Sender : {} no Primary available. So going to acquire distributed lock",
- this.sender);
+ sender);
}
- this.lockService.lock(this.lockToken, 10000, -1);
- return this.lockService.isHeldByCurrentThread(this.lockToken);
+ lockService.lock(lockToken, 10000, -1);
+ return lockService.isHeldByCurrentThread(lockToken);
}
return false;
}
@@ -357,11 +355,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
* @return boolean true if this eldest sender; false otherwise
*/
private boolean adviseEldestGatewaySender() {
- Profile[] snapshot = this.profiles;
+ Profile[] snapshot = profiles;
// sender with minimum startTime is eldest. Find out the minimum start time
// of remote senders.
- TreeSet<Long> senderStartTimes = new TreeSet<Long>();
+ TreeSet<Long> senderStartTimes = new TreeSet<>();
for (Profile profile : snapshot) {
GatewaySenderProfile sp = (GatewaySenderProfile) profile;
if (!sp.isParallel && sp.isRunning) {
@@ -375,96 +373,70 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
// it should give up
// and allow existing running senders to volunteer
return (senderStartTimes.isEmpty())
- || (this.sender.isRunning() && (this.sender.startTime <= senderStartTimes.first()));
- }
-
- private InternalDistributedMember adviseEldestGatewaySenderNode() {
- Profile[] snapshot = this.profiles;
-
- // sender with minimum startTime is eldest. Find out the minimum start time
- // of remote senders.
- InternalDistributedMember node = null;
- GatewaySenderProfile eldestProfile = null;
- for (Profile profile : snapshot) {
- GatewaySenderProfile sp = (GatewaySenderProfile) profile;
- if (!sp.isParallel && sp.isRunning) {
- if (eldestProfile == null) {
- eldestProfile = sp;
- }
- if (sp.startTime < eldestProfile.startTime) {
- eldestProfile = sp;
- }
- }
- }
-
- if (eldestProfile != null) {
- node = eldestProfile.getDistributedMember();
- }
- return node;
+ || (sender.isRunning() && (sender.startTime <= senderStartTimes.first()));
}
public void makePrimary() {
- logger.info("{} : Starting as primary", this.sender);
- AbstractGatewaySenderEventProcessor eventProcessor = this.sender.getEventProcessor();
+ logger.info("{} : Starting as primary", sender);
+ AbstractGatewaySenderEventProcessor eventProcessor = sender.getEventProcessor();
if (eventProcessor != null) {
eventProcessor.removeCacheListener();
}
- logger.info("{} : Becoming primary gateway sender", this.sender);
+ logger.info("{} : Becoming primary gateway sender", sender);
notifyAndBecomePrimary();
- new UpdateAttributesProcessor(this.sender).distribute(false);
+ new UpdateAttributesProcessor(sender).distribute(false);
}
public void notifyAndBecomePrimary() {
- synchronized (this.primaryLock) {
+ synchronized (primaryLock) {
setIsPrimary(true);
notifyPrimaryLock();
}
}
public void notifyPrimaryLock() {
- synchronized (this.primaryLock) {
- this.primaryLock.notifyAll();
+ synchronized (primaryLock) {
+ primaryLock.notifyAll();
}
}
public void makeSecondary() {
if (logger.isDebugEnabled()) {
logger.debug("{}: Did not obtain the lock on {}. Starting as secondary gateway sender.",
- this.sender, this.lockToken);
+ sender, lockToken);
}
// Set primary flag to false
logger.info(
"{} starting as secondary because primary gateway sender is available on member :{}",
- new Object[] {this.sender.getId(), advisePrimaryGatewaySender()});
- this.isPrimary = false;
- new UpdateAttributesProcessor(this.sender).distribute(false);
+ new Object[] {sender.getId(), advisePrimaryGatewaySender()});
+ isPrimary = false;
+ new UpdateAttributesProcessor(sender).distribute(false);
}
public void launchLockObtainingVolunteerThread() {
String threadName = "Gateway Sender Primary Lock Acquisition Thread Volunteer";
- this.lockObtainingThread = new LoggingThread(threadName, () -> {
- GatewaySenderAdvisor.this.sender.getLifeCycleLock().readLock().lock();
+ lockObtainingThread = new LoggingThread(threadName, () -> {
+ sender.getLifeCycleLock().readLock().lock();
try {
// Attempt to obtain the lock
- if (!(GatewaySenderAdvisor.this.sender.isRunning())) {
+ if (!(sender.isRunning())) {
return;
}
if (logger.isDebugEnabled()) {
- logger.debug("{}: Obtaining the lock on {}", this, GatewaySenderAdvisor.this.lockToken);
+ logger.debug("{}: Obtaining the lock on {}", this, lockToken);
}
if (volunteerForPrimary()) {
if (logger.isDebugEnabled()) {
logger.debug("{}: Obtained the lock on {}", this,
- GatewaySenderAdvisor.this.lockToken);
+ lockToken);
}
- logger.info("{} is becoming primary gateway Sender.",
- GatewaySenderAdvisor.this);
+ logger.info("{} is becoming primary gateway Sender.", this);
// As soon as the lock is obtained, set primary
- GatewaySenderAdvisor.this.makePrimary();
+ makePrimary();
}
} catch (CancelException e) {
// no action necessary
@@ -472,14 +444,14 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
if (!sender.getStopper().isCancelInProgress()) {
logger.fatal(String.format(
"%s: The thread to obtain the failover lock was interrupted. This gateway sender will never become the primary.",
- GatewaySenderAdvisor.this),
+ this),
e);
}
} finally {
- GatewaySenderAdvisor.this.sender.getLifeCycleLock().readLock().unlock();
+ sender.getLifeCycleLock().readLock().unlock();
}
});
- this.lockObtainingThread.start();
+ lockObtainingThread.start();
}
public void waitToBecomePrimary(AbstractGatewaySenderEventProcessor callingProcessor)
@@ -487,10 +459,10 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
if (isPrimary()) {
return;
}
- synchronized (this.primaryLock) {
- logger.info("{} : Waiting to become primary gateway", this.sender.getId());
+ synchronized (primaryLock) {
+ logger.info("{} : Waiting to become primary gateway", sender.getId());
while (!isPrimary()) {
- this.primaryLock.wait(1000);
+ primaryLock.wait(1000);
if (sender.getEventProcessor() != null && callingProcessor.isStopped()) {
logger.info("The event processor is stopped, not to wait for being primary any more.");
return;
@@ -527,11 +499,11 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
public boolean manualStart;
- public ArrayList<String> eventFiltersClassNames = new ArrayList<String>();
+ public ArrayList<String> eventFiltersClassNames = new ArrayList<>();
- public ArrayList<String> transFiltersClassNames = new ArrayList<String>();
+ public ArrayList<String> transFiltersClassNames = new ArrayList<>();
- public ArrayList<String> senderEventListenerClassNames = new ArrayList<String>();
+ public ArrayList<String> senderEventListenerClassNames = new ArrayList<>();
public boolean isDiskSynchronous;
@@ -553,47 +525,47 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
fromDataPre_GEODE_1_14_0_0(in, context);
- this.enforceThreadsConnectSameReceiver = in.readBoolean();
+ enforceThreadsConnectSameReceiver = in.readBoolean();
}
public void fromDataPre_GEODE_1_14_0_0(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
- this.Id = DataSerializer.readString(in);
- this.startTime = in.readLong();
- this.remoteDSId = in.readInt();
- this.isRunning = in.readBoolean();
- this.isPrimary = in.readBoolean();
- this.isParallel = in.readBoolean();
- this.isBatchConflationEnabled = in.readBoolean();
- this.isPersistenceEnabled = in.readBoolean();
- this.alertThreshold = in.readInt();
- this.manualStart = in.readBoolean();
- this.eventFiltersClassNames = DataSerializer.readArrayList(in);
- this.transFiltersClassNames = DataSerializer.readArrayList(in);
- this.senderEventListenerClassNames = DataSerializer.readArrayList(in);
- this.isDiskSynchronous = in.readBoolean();
- this.dispatcherThreads = in.readInt();
+ Id = DataSerializer.readString(in);
+ startTime = in.readLong();
+ remoteDSId = in.readInt();
+ isRunning = in.readBoolean();
+ isPrimary = in.readBoolean();
+ isParallel = in.readBoolean();
+ isBatchConflationEnabled = in.readBoolean();
+ isPersistenceEnabled = in.readBoolean();
+ alertThreshold = in.readInt();
+ manualStart = in.readBoolean();
+ eventFiltersClassNames = DataSerializer.readArrayList(in);
+ transFiltersClassNames = DataSerializer.readArrayList(in);
+ senderEventListenerClassNames = DataSerializer.readArrayList(in);
+ isDiskSynchronous = in.readBoolean();
+ dispatcherThreads = in.readInt();
if (StaticSerialization.getVersionForDataStream(in).isOlderThan(KnownVersion.GFE_90)) {
Gateway.OrderPolicy oldOrderPolicy = DataSerializer.readObject(in);
if (oldOrderPolicy != null) {
if (oldOrderPolicy.name().equals(OrderPolicy.KEY.name())) {
- this.orderPolicy = OrderPolicy.KEY;
+ orderPolicy = OrderPolicy.KEY;
} else if (oldOrderPolicy.name().equals(OrderPolicy.THREAD.name())) {
- this.orderPolicy = OrderPolicy.THREAD;
+ orderPolicy = OrderPolicy.THREAD;
} else {
- this.orderPolicy = OrderPolicy.PARTITION;
+ orderPolicy = OrderPolicy.PARTITION;
}
} else {
- this.orderPolicy = null;
+ orderPolicy = null;
}
} else {
- this.orderPolicy = DataSerializer.readObject(in);
+ orderPolicy = DataSerializer.readObject(in);
}
boolean serverLocationFound = DataSerializer.readPrimitiveBoolean(in);
if (serverLocationFound) {
- this.serverLocation = new ServerLocation();
- InternalDataSerializer.invokeFromData(this.serverLocation, in);
+ serverLocation = new ServerLocation();
+ InternalDataSerializer.invokeFromData(serverLocation, in);
}
}
@@ -623,8 +595,8 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
out.writeBoolean(isDiskSynchronous);
out.writeInt(dispatcherThreads);
if (StaticSerialization.getVersionForDataStream(out).isOlderThan(KnownVersion.GFE_90)
- && this.orderPolicy != null) {
- String orderPolicyName = this.orderPolicy.name();
+ && orderPolicy != null) {
+ String orderPolicyName = orderPolicy.name();
if (orderPolicyName.equals(Gateway.OrderPolicy.KEY.name())) {
DataSerializer.writeObject(Gateway.OrderPolicy.KEY, out);
} else if (orderPolicyName.equals(Gateway.OrderPolicy.THREAD.name())) {
@@ -635,7 +607,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
} else {
DataSerializer.writeObject(orderPolicy, out);
}
- boolean serverLocationFound = (this.serverLocation != null);
+ boolean serverLocationFound = (serverLocation != null);
DataSerializer.writePrimitiveBoolean(serverLocationFound, out);
if (serverLocationFound) {
InternalDataSerializer.invokeToData(serverLocation, out);
@@ -669,15 +641,15 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
- sb.append("; id=" + this.Id);
- sb.append("; remoteDSName=" + this.remoteDSId);
- sb.append("; isRunning=" + this.isRunning);
- sb.append("; isPrimary=" + this.isPrimary);
+ sb.append("; id=").append(Id);
+ sb.append("; remoteDSName=").append(remoteDSId);
+ sb.append("; isRunning=").append(isRunning);
+ sb.append("; isPrimary=").append(isPrimary);
}
}
public InternalDistributedMember advisePrimaryGatewaySender() {
- Profile[] snapshot = this.profiles;
+ Profile[] snapshot = profiles;
for (Profile profile : snapshot) {
GatewaySenderProfile sp = (GatewaySenderProfile) profile;
if (!sp.isParallel && sp.isPrimary) {
@@ -693,7 +665,7 @@ public class GatewaySenderAdvisor extends DistributionAdvisor {
@Override
public void close() {
- new UpdateAttributesProcessor(this.getAdvisee(), true).distribute(false);
+ new UpdateAttributesProcessor(getAdvisee(), true).distribute(false);
super.close();
}
}