You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/06/04 21:28:23 UTC

[geode] 05/18: GEODE-9132: Cleanup PartitionedRegionClearMessage

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/GEODE-7665
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 8df95fefc96069ce7a2d77a262e00a5eb5d631a2
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Fri Apr 9 17:24:26 2021 -0700

    GEODE-9132: Cleanup PartitionedRegionClearMessage
    
    * Use descriptive names for variables and methods
    * Use Objects.requireNonNull instead of Assert.assertTrue
    * Remove unnecessary uses of final, this, and super
    * Use static logger
    * Reformat some lines with weird formatting
---
 .../partitioned/PRClearCreateIndexDUnitTest.java   |   5 +-
 ...ionedRegionAfterClearNotificationDUnitTest.java |   4 +-
 ...itionedRegionClearWithAlterRegionDUnitTest.java |   2 +-
 ...gionClearWithConcurrentOperationsDUnitTest.java |   2 +-
 .../cache/PartitionedRegionClearMessage.java       | 225 ++++++++++-----------
 .../internal/cache/PartitionRegionClearHATest.java |   2 +-
 6 files changed, 118 insertions(+), 122 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
index 1c94c2d..423932d 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/cache/query/partitioned/PRClearCreateIndexDUnitTest.java
@@ -236,10 +236,11 @@ public class PRClearCreateIndexDUnitTest implements Serializable {
       if (message instanceof PartitionedRegionClearMessage) {
         PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
         if (clearMessage
-            .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+            .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
           lock_others = true;
         }
-        if (clearMessage.getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+        if (clearMessage
+            .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
           clear_others = true;
         }
       }
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
index 237b6a8..7979cfa 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionAfterClearNotificationDUnitTest.java
@@ -326,7 +326,7 @@ public class PartitionedRegionAfterClearNotificationDUnitTest implements Seriali
       public void beforeProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
         if (message instanceof PartitionedRegionClearMessage) {
           if (((PartitionedRegionClearMessage) message)
-              .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+              .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
             DistributionMessageObserver.setInstance(null);
             getBlackboard().signalGate("CLOSE_CACHE");
             try {
@@ -348,7 +348,7 @@ public class PartitionedRegionAfterClearNotificationDUnitTest implements Seriali
       public void afterProcessMessage(ClusterDistributionManager dm, DistributionMessage message) {
         if (message instanceof PartitionedRegionClearMessage) {
           if (((PartitionedRegionClearMessage) message)
-              .getOp() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
+              .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_LOCK_FOR_PR_CLEAR) {
             DistributionMessageObserver.setInstance(null);
             getBlackboard().signalGate("CLOSE_CACHE");
             try {
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java
index fb74eb3..564706e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithAlterRegionDUnitTest.java
@@ -759,7 +759,7 @@ public class PartitionedRegionClearWithAlterRegionDUnitTest implements Serializa
     private void shutdownMember(DistributionMessage message) {
       if (message instanceof PartitionedRegionClearMessage) {
         if (((PartitionedRegionClearMessage) message)
-            .getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+            .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
           DistributionMessageObserver.setInstance(null);
           InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
           MembershipManagerHelper
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
index fdb91c7..77537cb 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
@@ -703,7 +703,7 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements
     private void shutdownMember(DistributionMessage message) {
       if (message instanceof PartitionedRegionClearMessage) {
         if (((PartitionedRegionClearMessage) message)
-            .getOp() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
+            .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_PR_CLEAR) {
           DistributionMessageObserver.setInstance(null);
           InternalDistributedSystem.getConnectedInstance().stopReconnectingNoDisconnect();
           MembershipManagerHelper
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
index 724256b..36cdcb6 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionClearMessage.java
@@ -12,14 +12,16 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Objects;
 import java.util.Set;
 
+import org.apache.logging.log4j.Logger;
+
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.Operation;
@@ -32,7 +34,6 @@ import org.apache.geode.distributed.internal.ReplyMessage;
 import org.apache.geode.distributed.internal.ReplyProcessor21;
 import org.apache.geode.distributed.internal.ReplySender;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.CopyOnWriteHashSet;
 import org.apache.geode.internal.NanoTimer;
 import org.apache.geode.internal.cache.partitioned.PartitionMessage;
@@ -41,96 +42,92 @@ import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
-/**
- * this message is for operations no the partition region level, could be sent by any originating
- * member to the other members hosting this partition region
- */
 public class PartitionedRegionClearMessage extends PartitionMessage {
+  private static final Logger logger = LogService.getLogger();
 
   public enum OperationType {
     OP_LOCK_FOR_PR_CLEAR, OP_UNLOCK_FOR_PR_CLEAR, OP_PR_CLEAR,
   }
 
-  private Object cbArg;
-
-  private OperationType op;
-
-  private EventID eventID;
-
+  private Object callbackArgument;
+  private OperationType operationType;
+  private EventID eventId;
   private PartitionedRegion partitionedRegion;
-
   private Set<Integer> bucketsCleared;
 
-  @Override
-  public EventID getEventID() {
-    return eventID;
+  public PartitionedRegionClearMessage() {
+    // nothing
   }
 
-  public PartitionedRegionClearMessage() {}
+  PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients,
+      PartitionedRegion partitionedRegion, ReplyProcessor21 replyProcessor21,
+      PartitionedRegionClearMessage.OperationType operationType, RegionEventImpl regionEvent) {
+    super(recipients, partitionedRegion.getPRId(), replyProcessor21);
+    this.partitionedRegion = partitionedRegion;
+    this.operationType = operationType;
+    callbackArgument = regionEvent.getRawCallbackArgument();
+    eventId = regionEvent.getEventId();
+  }
 
-  PartitionedRegionClearMessage(Set<InternalDistributedMember> recipients, PartitionedRegion region,
-      ReplyProcessor21 processor, PartitionedRegionClearMessage.OperationType operationType,
-      final RegionEventImpl event) {
-    super(recipients, region.getPRId(), processor);
-    partitionedRegion = region;
-    op = operationType;
-    cbArg = event.getRawCallbackArgument();
-    eventID = event.getEventId();
+  @Override
+  public EventID getEventID() {
+    return eventId;
   }
 
-  public OperationType getOp() {
-    return op;
+  public OperationType getOperationType() {
+    return operationType;
   }
 
   public void send() {
-    Assert.assertTrue(getRecipients() != null, "ClearMessage NULL recipients set");
+    Objects.requireNonNull(getRecipients(), "ClearMessage NULL recipients set");
+
     setTransactionDistributed(partitionedRegion.getCache().getTxManager().isDistributed());
     partitionedRegion.getDistributionManager().putOutgoing(this);
   }
 
   @Override
-  protected Throwable processCheckForPR(PartitionedRegion pr,
+  protected Throwable processCheckForPR(PartitionedRegion partitionedRegion,
       DistributionManager distributionManager) {
-    if (pr != null && !pr.getDistributionAdvisor().isInitialized()) {
+    if (partitionedRegion != null && !partitionedRegion.getDistributionAdvisor().isInitialized()) {
       return new ForceReattemptException(
           String.format("%s : could not find partitioned region with Id %s",
               distributionManager.getDistributionManagerId(),
-              pr.getRegionIdentifier()));
+              partitionedRegion.getRegionIdentifier()));
     }
     return null;
   }
 
   @Override
-  protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm,
-      PartitionedRegion partitionedRegion,
-      long startTime) throws CacheException {
-
+  protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager,
+      PartitionedRegion partitionedRegion, long startTime) throws CacheException {
     if (partitionedRegion == null) {
       return true;
     }
-
     if (partitionedRegion.isDestroyed()) {
       return true;
     }
 
-    if (op == OperationType.OP_LOCK_FOR_PR_CLEAR) {
+    if (operationType == OperationType.OP_LOCK_FOR_PR_CLEAR) {
       partitionedRegion.getPartitionedRegionClear().obtainClearLockLocal(getSender());
-    } else if (op == OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
+    } else if (operationType == OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
       partitionedRegion.getPartitionedRegionClear().releaseClearLockLocal();
     } else {
       RegionEventImpl event =
-          new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, this.cbArg, true,
-              partitionedRegion.getMyId(),
-              getEventID());
+          new RegionEventImpl(partitionedRegion, Operation.REGION_CLEAR, callbackArgument, true,
+              partitionedRegion.getMyId(), getEventID());
       bucketsCleared = partitionedRegion.getPartitionedRegionClear().clearRegionLocal(event);
     }
     return true;
   }
 
   @Override
-  protected void appendFields(StringBuilder buff) {
-    super.appendFields(buff);
-    buff.append(" cbArg=").append(this.cbArg).append(" op=").append(this.op);
+  protected void appendFields(StringBuilder stringBuilder) {
+    super.appendFields(stringBuilder);
+    stringBuilder
+        .append(" cbArg=")
+        .append(callbackArgument)
+        .append(" op=")
+        .append(operationType);
   }
 
   @Override
@@ -139,21 +136,32 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
   }
 
   @Override
-  public void fromData(DataInput in,
-      DeserializationContext context) throws IOException, ClassNotFoundException {
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
     super.fromData(in, context);
-    this.cbArg = DataSerializer.readObject(in);
-    op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
-    eventID = DataSerializer.readObject(in);
+    callbackArgument = DataSerializer.readObject(in);
+    operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+    eventId = DataSerializer.readObject(in);
   }
 
   @Override
-  public void toData(DataOutput out,
-      SerializationContext context) throws IOException {
+  public void toData(DataOutput out, SerializationContext context) throws IOException {
     super.toData(out, context);
-    DataSerializer.writeObject(this.cbArg, out);
-    out.writeByte(op.ordinal());
-    DataSerializer.writeObject(eventID, out);
+    DataSerializer.writeObject(callbackArgument, out);
+    out.writeByte(operationType.ordinal());
+    DataSerializer.writeObject(eventId, out);
+  }
+
+  @Override
+  protected void sendReply(InternalDistributedMember recipient, int processorId,
+      DistributionManager distributionManager, ReplyException replyException,
+      PartitionedRegion partitionedRegion, long startTime) {
+    if (partitionedRegion != null && startTime > 0) {
+      partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
+    }
+    PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage
+        .send(recipient, processorId, getReplySender(distributionManager), operationType,
+            bucketsCleared, replyException);
   }
 
   /**
@@ -161,97 +169,85 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
    * received from the "far side"
    */
   public static class PartitionedRegionClearResponse extends ReplyProcessor21 {
+
     CopyOnWriteHashSet<Integer> bucketsCleared = new CopyOnWriteHashSet<>();
 
     public PartitionedRegionClearResponse(InternalDistributedSystem system,
-        Set<InternalDistributedMember> initMembers) {
-      super(system, initMembers);
+        Set<InternalDistributedMember> recipients) {
+      super(system, recipients);
     }
 
     @Override
-    public void process(DistributionMessage msg) {
-      if (msg instanceof PartitionedRegionClearReplyMessage) {
-        Set<Integer> buckets = ((PartitionedRegionClearReplyMessage) msg).bucketsCleared;
+    public void process(DistributionMessage message) {
+      if (message instanceof PartitionedRegionClearReplyMessage) {
+        Set<Integer> buckets = ((PartitionedRegionClearReplyMessage) message).bucketsCleared;
         if (buckets != null) {
           bucketsCleared.addAll(buckets);
         }
       }
-      super.process(msg, true);
-    }
-  }
-
-  @Override
-  protected void sendReply(InternalDistributedMember member, int processorId,
-      DistributionManager distributionManager, ReplyException ex,
-      PartitionedRegion partitionedRegion, long startTime) {
-    if (partitionedRegion != null) {
-      if (startTime > 0) {
-        partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime);
-      }
+      process(message, true);
     }
-    PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage
-        .send(member, processorId, getReplySender(distributionManager), op, bucketsCleared,
-            ex);
   }
 
   public static class PartitionedRegionClearReplyMessage extends ReplyMessage {
 
     private Set<Integer> bucketsCleared;
 
-    private OperationType op;
+    private OperationType operationType;
 
     @Override
     public boolean getInlineProcess() {
       return true;
     }
 
+    public static void send(InternalDistributedMember recipient, int processorId,
+        ReplySender replySender, OperationType operationType, Set<Integer> bucketsCleared,
+        ReplyException replyException) {
+      Objects.requireNonNull(recipient, "partitionedRegionClearReplyMessage NULL reply message");
+
+      PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage replyMessage =
+          new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId,
+              operationType, bucketsCleared, replyException);
+
+      replyMessage.setRecipient(recipient);
+      replySender.putOutgoing(replyMessage);
+    }
+
     /**
      * Empty constructor to conform to DataSerializable interface
      */
-    public PartitionedRegionClearReplyMessage() {}
+    public PartitionedRegionClearReplyMessage() {
+      // Empty constructor to conform to DataSerializable interface
+    }
 
-    private PartitionedRegionClearReplyMessage(int processorId, OperationType op,
-        Set<Integer> bucketsCleared, ReplyException ex) {
-      super();
+    private PartitionedRegionClearReplyMessage(int processorId, OperationType operationType,
+        Set<Integer> bucketsCleared, ReplyException replyException) {
       this.bucketsCleared = bucketsCleared;
-      this.op = op;
+      this.operationType = operationType;
       setProcessorId(processorId);
-      setException(ex);
-    }
-
-    /** Send an ack */
-    public static void send(InternalDistributedMember recipient, int processorId, ReplySender dm,
-        OperationType op, Set<Integer> bucketsCleared, ReplyException ex) {
-
-      Assert.assertTrue(recipient != null, "partitionedRegionClearReplyMessage NULL reply message");
-
-      PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage m =
-          new PartitionedRegionClearMessage.PartitionedRegionClearReplyMessage(processorId, op,
-              bucketsCleared, ex);
-
-      m.setRecipient(recipient);
-      dm.putOutgoing(m);
+      setException(replyException);
     }
 
     /**
      * Processes this message. This method is invoked by the receiver of the message.
      *
-     * @param dm the distribution manager that is processing the message.
+     * @param distributionManager the distribution manager that is processing the message.
      */
     @Override
-    public void process(final DistributionManager dm, final ReplyProcessor21 rp) {
-      final long startTime = getTimestamp();
+    public void process(DistributionManager distributionManager,
+        ReplyProcessor21 replyProcessor21) {
+      long startTime = getTimestamp();
 
-      if (rp == null) {
-        if (LogService.getLogger().isTraceEnabled(LogMarker.DM_VERBOSE)) {
-          LogService.getLogger().trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
+      if (replyProcessor21 == null) {
+        if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
+          logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
         }
         return;
       }
 
-      rp.process(this);
+      replyProcessor21.process(this);
 
-      dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
+      distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
     }
 
     @Override
@@ -260,30 +256,29 @@ public class PartitionedRegionClearMessage extends PartitionMessage {
     }
 
     @Override
-    public void fromData(DataInput in,
-        DeserializationContext context) throws IOException, ClassNotFoundException {
+    public void fromData(DataInput in, DeserializationContext context)
+        throws IOException, ClassNotFoundException {
       super.fromData(in, context);
-      op = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
+      operationType = PartitionedRegionClearMessage.OperationType.values()[in.readByte()];
       bucketsCleared = DataSerializer.readObject(in);
     }
 
     @Override
-    public void toData(DataOutput out,
-        SerializationContext context) throws IOException {
+    public void toData(DataOutput out, SerializationContext context) throws IOException {
       super.toData(out, context);
-      out.writeByte(op.ordinal());
+      out.writeByte(operationType.ordinal());
       DataSerializer.writeObject(bucketsCleared, out);
     }
 
     @Override
     public String toString() {
-      StringBuffer sb = new StringBuffer();
-      sb.append("PartitionedRegionClearReplyMessage ")
-          .append("processorId=").append(this.processorId)
+      return new StringBuilder()
+          .append("PartitionedRegionClearReplyMessage ")
+          .append("processorId=").append(processorId)
           .append(" sender=").append(sender)
-          .append(" bucketsCleared ").append(this.bucketsCleared)
-          .append(" exception=").append(getException());
-      return sb.toString();
+          .append(" bucketsCleared ").append(bucketsCleared)
+          .append(" exception=").append(getException())
+          .toString();
     }
   }
 }
diff --git a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java
index 7d0db0a..5497028 100644
--- a/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java
+++ b/geode-cq/src/distributedTest/java/org/apache/geode/internal/cache/PartitionRegionClearHATest.java
@@ -220,7 +220,7 @@ public class PartitionRegionClearHATest implements Serializable {
       if (message instanceof PartitionedRegionClearMessage) {
         PartitionedRegionClearMessage clearMessage = (PartitionedRegionClearMessage) message;
         if (clearMessage
-            .getOp() == PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
+            .getOperationType() == PartitionedRegionClearMessage.OperationType.OP_UNLOCK_FOR_PR_CLEAR) {
           try {
             // count down to 1 so that we can go ahead and restart the server
             latch.countDown();