You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ji...@apache.org on 2022/06/08 22:42:37 UTC
[geode] 05/22: GEODE-9132: Cleanup PartitionedRegionClearMessage
This is an automated email from the ASF dual-hosted git repository.
jinmeiliao pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git
commit f0062cf2168686f46508631123ec825fb485799e
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 9 17:24:26 2021 -0700
GEODE-9132: Cleanup PartitionedRegionClearMessage
* Use descriptive names for variables and methods
* Use Objects.requireNonNull instead of Assert.assertTrue
* Remove unnecessary uses of final, this, and super
* Use static logger
* Reformat some lines with weird formatting
---
.../partitioned/PRClearCreateIndexDUnitTest.java | 5 +-
...ionedRegionAfterClearNotificationDUnitTest.java | 4 +-
...itionedRegionClearWithAlterRegionDUnitTest.java | 2 +-
...gionClearWithConcurrentOperationsDUnitTest.java | 2 +-
.../cache/PartitionedRegionClearMessage.java | 225 ++++++++++-----------
.../internal/cache/PartitionRegionClearHATest.java | 2 +-
6 files changed, 118 insertions(+), 122 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
index 1c94c2dec7..423932d6d4 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
@@ -236,10 +236,11 @@ public class PRClearCreateIndexDUnitTest implements Serializable {
if (message instanceof PartitionedRegionClearMessage) {
PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
if (clearMessage
- .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+ .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
lock_others = true;
}
- if (clearMessage.getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+ if (clearMessage
+ .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
clear_others = true;
}
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
index 237b6a8171..7979cfaa16 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
@@ -326,7 +326,7 @@ public class PartitionedRegionAfterClearNotificationDUnitTest implements Seriali
public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
if (message instanceof PartitionedRegionClearMessage) {
if (((PartitionedRegionClearMessage) message)
- .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+ .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
DistributionMessageObserver.setInstance(null);
getBlackboard().signalGate("CLOSE_CACHE");
try {
@@ -348,7 +348,7 @@ public class PartitionedRegionAfterClearNotificationDUnitTest implements Seriali
public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
if (message instanceof PartitionedRegionClearMessage) {
if (((PartitionedRegionClearMessage) message)
- .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+ .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
DistributionMessageObserver.setInstance(null);
getBlackboard().signalGate("CLOSE_CACHE");
try {
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java
index fb74eb32a4..564706eba8 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java
@@ -759,7 +759,7 @@ public class PartitionedRegionClearWithAlterRegionDUnitTest implements Serializa
private void shutdownMember(DistributionMessage message) {
if (message instanceof PartitionedRegionClearMessage) {
if (((PartitionedRegionClearMessage) message)
- .getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+ .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
DistributionMessageObserver.setInstance(null);
InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
MembershipManagerHelper
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index fdb91c7005..77537cbda3 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -703,7 +703,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
private void shutdownMember(DistributionMessage message) {
if (message instanceof PartitionedRegionClearMessage) {
if (((PartitionedRegionClearMessage) message)
- .getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+ .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
DistributionMessageObserver.setInstance(null);
InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
MembershipManagerHelper
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
index 724256b365..36cdcb6b3d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
@@ -12,14 +12,16 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Objects;
import java.util.Set;
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.Operation;
@@ -32,7 +34,6 @@ import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.ReplySender;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.cache.partitioned.PartitionMessage;
@@ -41,96 +42,92 @@ import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
-/**
- * this message is for operations no the partition region level, could be sent by any originating
- * member to the other members hosting this partition region
- */
public class PartitionedRegionClearMessage extends PartitionMessage {
+ private static final Logger logger = LogService.getLogger();
public enum OperationType {
OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR,
}
- private Object cbArg;
-
- private OperationType op;
-
- private EventID eventID;
-
+ private Object callbackArgument;
+ private OperationType operationType;
+ private EventID eventId;
private PartitionedRegion partitionedRegion;
-
private Set<Integer> bucketsCleared;
- @Override
- public EventID getEventID() {
- return eventID;
+ public PartitionedRegionClearMessage() {
+ // nothing
}
- public PartitionedRegionClearMessage() {}
+ PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients,
+ PartitionedRegion partitionedRegion, ReplyProcessor21 replyProcessor21,
+ PartitionedRegionClearMessage.OperationType operationType, RegionEventImpl regionEvent) {
+ super(recipients, partitionedRegion.getPRId(), replyProcessor21);
+ this.partitionedRegion = partitionedRegion;
+ this.operationType = operationType;
+ callbackArgument = regionEvent.getRawCallbackArgument();
+ eventId = regionEvent.getEventId();
+ }
- PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients, PartitionedRegion region,
- ReplyProcessor21 processor, PartitionedRegionClearMessage.OperationType operationType,
- final RegionEventImpl event) {
- super(recipients, region.getPRId(), processor);
- partitionedRegion = region;
- op = operationType;
- cbArg = event.getRawCallbackArgument();
- eventID = event.getEventId();
+ @Override
+ public EventID getEventID() {
+ return eventId;
}
- public OperationType getOp() {
- return op;
+ public OperationType getOperationType() {
+ return operationType;
}
public void send() {
- Assert.assertTrue(getRecipients() != null, "ClearMessage NULL recipients set");
+ Objects.requireNonNull(getRecipients(), "ClearMessage NULL recipients set");
+
setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed());
partitionedRegion.getDistributionManager().putOutgoing(this);
}
@Override
- protected Throwable processCheckForPR(PartitionedRegion pr,
+ protected Throwable processCheckForPR(PartitionedRegion partitionedRegion,
DistributionManager distributionManager) {
- if (pr != null && !pr.getDistributionAdvisor().isInitialized()) {
+ if (partitionedRegion != null && !partitionedRegion.getDistributionAdvisor().isInitialized()) {
return new ForceReattemptException(
String.format("%s : could not find partitioned region with Id %s",
distributionManager.getDistributionManagerId(),
- pr.getRegionIdentifier()));
+ partitionedRegion.getRegionIdentifier()));
}
return null;
}
@Override
- protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm,
- PartitionedRegion partitionedRegion,
- long startTime) throws CacheException {
-
+ protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
+ PartitionedRegion partitionedRegion, long startTime) throws CacheException {
if (partitionedRegion == null) {
return true;
}
-
if (partitionedRegion.isDestroyed()) {
return true;
}
- if (op == OperationType.OP_LOCK_FOR_PR_CLEAR) {
+ if (operationType == OperationType.OP_LOCK_FOR_PR_CLEAR) {
partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender());
- } else if (op == OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
+ } else if (operationType == OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal();
} else {
RegionEventImpl event =
- new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, this.cbArg, true,
- partitionedRegion.getMyId(),
- getEventID());
+ new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true,
+ partitionedRegion.getMyId(), getEventID());
bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event);
}
return true;
}
@Override
- protected void appendFields(StringBuilder buff) {
- super.appendFields(buff);
- buff.append(" cbArg=").append(this.cbArg).append(" op=").append(this.op);
+ protected void appendFields(StringBuilder stringBuilder) {
+ super.appendFields(stringBuilder);
+ stringBuilder
+ .append(" cbArg=")
+ .append(callbackArgument)
+ .append(" op=")
+ .append(operationType);
}
@Override
@@ -139,21 +136,32 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
}
@Override
- public void fromData(DataInput in,
- DeserializationContext context) throws IOException, ClassNotFoundException {
+ public void fromData(DataInput in, DeserializationContext context)
+ throws IOException, ClassNotFoundException {
super.fromData(in, context);
- this.cbArg = DataSerializer.readObject(in);
- op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
- eventID = DataSerializer.readObject(in);
+ callbackArgument = DataSerializer.readObject(in);
+ operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+ eventId = DataSerializer.readObject(in);
}
@Override
- public void toData(DataOutput out,
- SerializationContext context) throws IOException {
+ public void toData(DataOutput out, SerializationContext context) throws IOException {
super.toData(out, context);
- DataSerializer.writeObject(this.cbArg, out);
- out.writeByte(op.ordinal());
- DataSerializer.writeObject(eventID, out);
+ DataSerializer.writeObject(callbackArgument, out);
+ out.writeByte(operationType.ordinal());
+ DataSerializer.writeObject(eventId, out);
+ }
+
+ @Override
+ protected void sendReply(InternalDistributedMember recipient, int processorId,
+ DistributionManager distributionManager, ReplyException replyException,
+ PartitionedRegion partitionedRegion, long startTime) {
+ if (partitionedRegion != null && startTime > 0) {
+ partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+ }
+ PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage
+ .send(recipient, processorId, getReplySender(distributionManager), operationType,
+ bucketsCleared, replyException);
}
/**
@@ -161,97 +169,85 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
* received from the "far side"
*/
public static class PartitionedRegionClearResponse extends ReplyProcessor21 {
+
CopyOnWriteHashSet<Integer> bucketsCleared = new CopyOnWriteHashSet<>();
public PartitionedRegionClearResponse(InternalDistributedSystem system,
- Set<InternalDistributedMember> initMembers) {
- super(system, initMembers);
+ Set<InternalDistributedMember> recipients) {
+ super(system, recipients);
}
@Override
- public void process(DistributionMessage msg) {
- if (msg instanceof PartitionedRegionClearReplyMessage) {
- Set<Integer> buckets = ((PartitionedRegionClearReplyMessage) msg).bucketsCleared;
+ public void process(DistributionMessage message) {
+ if (message instanceof PartitionedRegionClearReplyMessage) {
+ Set<Integer> buckets = ((PartitionedRegionClearReplyMessage) message).bucketsCleared;
if (buckets != null) {
bucketsCleared.addAll(buckets);
}
}
- super.process(msg, true);
- }
- }
-
- @Override
- protected void sendReply(InternalDistributedMember member, int processorId,
- DistributionManager distributionManager, ReplyException ex,
- PartitionedRegion partitionedRegion, long startTime) {
- if (partitionedRegion != null) {
- if (startTime > 0) {
- partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
- }
+ process(message, true);
}
- PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage
- .send(member, processorId, getReplySender(distributionManager), op, bucketsCleared,
- ex);
}
public static class PartitionedRegionClearReplyMessage extends ReplyMessage {
private Set<Integer> bucketsCleared;
- private OperationType op;
+ private OperationType operationType;
@Override
public boolean getInlineProcess() {
return true;
}
+ public static void send(InternalDistributedMember recipient, int processorId,
+ ReplySender replySender, OperationType operationType, Set<Integer> bucketsCleared,
+ ReplyException replyException) {
+ Objects.requireNonNull(recipient, "partitionedRegionClearReplyMessage NULL reply message");
+
+ PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage replyMessage =
+ new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId,
+ operationType, bucketsCleared, replyException);
+
+ replyMessage.setRecipient(recipient);
+ replySender.putOutgoing(replyMessage);
+ }
+
/**
* Empty constructor to conform to DataSerializable interface
*/
- public PartitionedRegionClearReplyMessage() {}
+ public PartitionedRegionClearReplyMessage() {
+ // Empty constructor to conform to DataSerializable interface
+ }
- private PartitionedRegionClearReplyMessage(int processorId, OperationType op,
- Set<Integer> bucketsCleared, ReplyException ex) {
- super();
+ private PartitionedRegionClearReplyMessage(int processorId, OperationType operationType,
+ Set<Integer> bucketsCleared, ReplyException replyException) {
this.bucketsCleared = bucketsCleared;
- this.op = op;
+ this.operationType = operationType;
setProcessorId(processorId);
- setException(ex);
- }
-
- /** Send an ack */
- public static void send(InternalDistributedMember recipient, int processorId, ReplySender dm,
- OperationType op, Set<Integer> bucketsCleared, ReplyException ex) {
-
- Assert.assertTrue(recipient != null, "partitionedRegionClearReplyMessage NULL reply message");
-
- PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage m =
- new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId, op,
- bucketsCleared, ex);
-
- m.setRecipient(recipient);
- dm.putOutgoing(m);
+ setException(replyException);
}
/**
* Processes this message. This method is invoked by the receiver of the message.
*
- * @param dm the distribution manager that is processing the message.
+ * @param distributionManager the distribution manager that is processing the message.
*/
@Override
- public void process(final DistributionManager dm, final ReplyProcessor21 rp) {
- final long startTime = getTimestamp();
+ public void process(DistributionManager distributionManager,
+ ReplyProcessor21 replyProcessor21) {
+ long startTime = getTimestamp();
- if (rp == null) {
- if (LogService.getLogger().isTraceEnabled(LogMarker.DM_VERBOSE)) {
- LogService.getLogger().trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
+ if (replyProcessor21 == null) {
+ if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+ logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
}
return;
}
- rp.process(this);
+ replyProcessor21.process(this);
- dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
+ distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
}
@Override
@@ -260,30 +256,29 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
}
@Override
- public void fromData(DataInput in,
- DeserializationContext context) throws IOException, ClassNotFoundException {
+ public void fromData(DataInput in, DeserializationContext context)
+ throws IOException, ClassNotFoundException {
super.fromData(in, context);
- op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+ operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
bucketsCleared = DataSerializer.readObject(in);
}
@Override
- public void toData(DataOutput out,
- SerializationContext context) throws IOException {
+ public void toData(DataOutput out, SerializationContext context) throws IOException {
super.toData(out, context);
- out.writeByte(op.ordinal());
+ out.writeByte(operationType.ordinal());
DataSerializer.writeObject(bucketsCleared, out);
}
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
- sb.append("PartitionedRegionClearReplyMessage ")
- .append("processorId=").append(this.processorId)
+ return new StringBuilder()
+ .append("PartitionedRegionClearReplyMessage ")
+ .append("processorId=").append(processorId)
.append(" sender=").append(sender)
- .append(" bucketsCleared ").append(this.bucketsCleared)
- .append(" exception=").append(getException());
- return sb.toString();
+ .append(" bucketsCleared ").append(bucketsCleared)
+ .append(" exception=").append(getException())
+ .toString();
}
}
}
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java
index 7d0db0a17e..5497028f1e 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java
@@ -220,7 +220,7 @@ public class PartitionRegionClearHATest implements Serializable {
if (message instanceof PartitionedRegionClearMessage) {
PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
if (clearMessage
- .getOp() == PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
+ .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
try {
// count down to 1 so that we can go ahead and restart the server
latch.countDown();