You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/02/23 22:30:18 UTC

[GitHub] [geode] pivotal-eshu opened a new pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

pivotal-eshu opened a new pull request #6051:
URL: https://github.com/apache/geode/pull/6051


     * Do not process DistributedCacheOperation in-line if scope is DISTRIBUTED_NO_ACK.
     * This is to solve a potential deadlock. The p2p reader thread could be blocked
       on synchronized lock of an entry, and could not handle the DLock GRANT message
       which is needed by another thread holding the synchronized lock.
   
   Thank you for submitting a contribution to Apache Geode.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `develop`)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   - [ ] Does `gradlew build` run cleanly?
   
   - [ ] Have you written or updated unit tests to verify your changes?
   
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   
   ### Note:
   Please ensure that once the PR is submitted, check Concourse for build issues and
   submit an update to your PR as soon as possible. If you need help, please send an
   email to dev@geode.apache.org.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jchen21 commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
jchen21 commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r583317242



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
##########
@@ -1107,6 +1107,10 @@ protected void process(final ClusterDistributionManager dm) {
 
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
+        if (lclRgn != null && lclRgn.getScope().isDistributedNoAck()) {
+          dm.getExecutors().getWaitingThreadPool().execute(() -> basicProcess(dm, lclRgn));

Review comment:
       Good point. For `basicProcess()`, if debug is not enabled, some exceptions are caught, but not logged, although some flags are set, when the exceptions are caught. Will the new thread fail silently with such exceptions, if debug is not enabled?
   And even for `process()`, logging the exceptions also depends on whether debug is enabled. I am not sure whether this is by design. If debug is not enabled, without the log message, it is hard to analyze the failures. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] kirklund edited a comment on pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
kirklund edited a comment on pull request #6051:
URL: https://github.com/apache/geode/pull/6051#issuecomment-784606768






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jchen21 commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
jchen21 commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r582416387



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
##########
@@ -1107,6 +1107,10 @@ protected void process(final ClusterDistributionManager dm) {
 
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
+        if (lclRgn != null && lclRgn.getScope().isDistributedNoAck()) {
+          dm.getExecutors().getWaitingThreadPool().execute(() -> basicProcess(dm, lclRgn));

Review comment:
       I have the same concern. When a thread runs, if it throws an exception that is not caught, then that exception will just silently kill the thread. This can make it very hard to diagnose what is happening. The current thread has a few `catch` blocks and a `finally` block (line 1115-1147). If the newly spawned thread does not have the `catch` and `finally` blocks, I would like to understand why.
   
   Another question is why use `getWaitingThreadPool()`  and not the other pool, e.g. `getThreadPool()`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on pull request #6051:
URL: https://github.com/apache/geode/pull/6051#issuecomment-824209596


   As this fix causes some behavior change -- will try out another fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r581494208



##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
##########
@@ -17,22 +17,68 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.OperationExecutors;
+import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.test.fake.Fakes;
 
 public class DistributedCacheOperationTest {
+  private TestCacheOperationMessage message;
+  private InternalDistributedMember sender;
+  private ClusterDistributionManager dm;
+  private LocalRegion region;
+  private VersionTag<?> versionTag;
+  private Scope scope;
+  private OperationExecutors executors;
+  private final int processorId = 1;
+
+  @Before
+  public void setup() {
+    message = spy(new TestCacheOperationMessage());
+    sender = mock(InternalDistributedMember.class);
+    versionTag = mock(VersionTag.class);
+
+    GemFireCacheImpl cache = Fakes.cache();

Review comment:
       Removed the Fakes.
   As the product code method signature is using LocalRegion, using InternalRegion may need  some casting. Will leave it as is, unless we need to change the product code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r582427022



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
##########
@@ -1107,6 +1107,10 @@ protected void process(final ClusterDistributionManager dm) {
 
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
+        if (lclRgn != null && lclRgn.getScope().isDistributedNoAck()) {
+          dm.getExecutors().getWaitingThreadPool().execute(() -> basicProcess(dm, lclRgn));

Review comment:
       In basicProcess call (using waiting thread pool for distributed-no-ack regions), CancelException is also caught and handled.
   catch (RegionDestroyedException ignore) {
           this.closed = true;
           if (logger.isDebugEnabled()) {
             logger.debug("{} Region destroyed: nothing to do", this);
           }
         } catch (CancelException ignore) {
           this.closed = true;
           if (logger.isDebugEnabled()) {
             logger.debug("{} Cancelled: nothing to do", this);
           }
         }




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] bschuchardt commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
bschuchardt commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r582366653



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
##########
@@ -1107,6 +1107,10 @@ protected void process(final ClusterDistributionManager dm) {
 
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
+        if (lclRgn != null && lclRgn.getScope().isDistributedNoAck()) {
+          dm.getExecutors().getWaitingThreadPool().execute(() -> basicProcess(dm, lclRgn));

Review comment:
       this previously caught CancelException and set this.closed=true.  That's no longer being done for distributed-no-ack regions.  Why is that okay?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] lgtm-com[bot] commented on pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #6051:
URL: https://github.com/apache/geode/pull/6051#issuecomment-784649285


   This pull request **fixes 2 alerts** when merging 7addfe4c2c220d9963ec2060191547e329d800a2 into 3a21c2852746f19755ac302f584ca5b8908eae2e - [view on LGTM.com](https://lgtm.com/projects/g/apache/geode/rev/pr-f29a55f2fbac14a855834134cda854584c6d7ce7)
   
   **fixed alerts:**
   
   * 2 for Dereferenced variable may be null


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r582346253



##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
##########
@@ -72,11 +114,112 @@ public void endOperationIsInvokedOnDistributionError() {
     assertTrue(operation.endOperationInvoked);
   }
 
+  @Test
+  public void processReplacesVersionTagNullIDs() {
+    message.process(dm);
+
+    verify(versionTag).replaceNullIDs(sender);
+  }
+
+  @Test
+  public void processSendsReplyIfAdminDM() {
+    when(dm.getDMType()).thenReturn(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
+
+    message.process(dm);
+
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfLocalRegionIsNull() {
+    doReturn(null).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfGotCacheClosedException() {
+    CacheClosedException cacheClosedException = new CacheClosedException();
+    doThrow(cacheClosedException).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();

Review comment:
       I believe there was a requirement that all new tests should use assertj methods.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu closed pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
pivotal-eshu closed pull request #6051:
URL: https://github.com/apache/geode/pull/6051


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] gesterzhou commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
gesterzhou commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r608050930



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
##########
@@ -1107,6 +1107,10 @@ protected void process(final ClusterDistributionManager dm) {
 
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
+        if (lclRgn != null && lclRgn.getScope().isDistributedNoAck()) {
+          dm.getExecutors().getWaitingThreadPool().execute(() -> basicProcess(dm, lclRgn));

Review comment:
       But for this corner case, I think an info level log is necessary. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] kirklund commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
kirklund commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r581467445



##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
##########
@@ -17,22 +17,68 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.HashMap;
 import java.util.Map;
 
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
+import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.CacheEvent;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.OperationExecutors;
+import org.apache.geode.distributed.internal.ReplyException;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.DistributedCacheOperation.CacheOperationMessage;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
+import org.apache.geode.internal.cache.versions.VersionTag;
+import org.apache.geode.test.fake.Fakes;
 
 public class DistributedCacheOperationTest {
+  private TestCacheOperationMessage message;
+  private InternalDistributedMember sender;
+  private ClusterDistributionManager dm;
+  private LocalRegion region;
+  private VersionTag<?> versionTag;
+  private Scope scope;
+  private OperationExecutors executors;
+  private final int processorId = 1;
+
+  @Before
+  public void setup() {
+    message = spy(new TestCacheOperationMessage());
+    sender = mock(InternalDistributedMember.class);
+    versionTag = mock(VersionTag.class);
+
+    GemFireCacheImpl cache = Fakes.cache();

Review comment:
       I'd really like to see us stop using `Fakes.cache()` in new unit tests so we can get rid of it. Can you please see if you can remove this and mock everything `DistributedCacheOperation` needs directly? Also, please try to use `InternalCache` and `InternalRegion` instead of `GemFireCacheImpl` and `LocalRegion`. Let me know if you'd like some help or want me to pair with you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r585803394



##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
##########
@@ -1107,6 +1107,10 @@ protected void process(final ClusterDistributionManager dm) {
 
         final LocalRegion lclRgn = getLocalRegionForProcessing(dm);
         sendReply = false;
+        if (lclRgn != null && lclRgn.getScope().isDistributedNoAck()) {
+          dm.getExecutors().getWaitingThreadPool().execute(() -> basicProcess(dm, lclRgn));

Review comment:
       I believe it is intentional. We do not want to pollute the user logging.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] lgtm-com[bot] commented on pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
lgtm-com[bot] commented on pull request #6051:
URL: https://github.com/apache/geode/pull/6051#issuecomment-784580721


   This pull request **fixes 2 alerts** when merging 4a854a6167ae6aa497348b2ad971deb72706b114 into 3a21c2852746f19755ac302f584ca5b8908eae2e - [view on LGTM.com](https://lgtm.com/projects/g/apache/geode/rev/pr-1683c4857c7e11b7f513a822db5bec3989d91494)
   
   **fixed alerts:**
   
   * 2 for Dereferenced variable may be null


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] kirklund commented on pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
kirklund commented on pull request #6051:
URL: https://github.com/apache/geode/pull/6051#issuecomment-784606768


   The changes look great. The only change I'd really like to see is to avoid the use of `Fakes.cache()`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6051: GEODE-8862: Uses another thread to process DistributedCacheOperation

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6051:
URL: https://github.com/apache/geode/pull/6051#discussion_r582199184



##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
##########
@@ -72,11 +114,112 @@ public void endOperationIsInvokedOnDistributionError() {
     assertTrue(operation.endOperationInvoked);
   }
 
+  @Test
+  public void processReplacesVersionTagNullIDs() {
+    message.process(dm);
+
+    verify(versionTag).replaceNullIDs(sender);
+  }
+
+  @Test
+  public void processSendsReplyIfAdminDM() {
+    when(dm.getDMType()).thenReturn(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
+
+    message.process(dm);
+
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfLocalRegionIsNull() {
+    doReturn(null).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),

Review comment:
       It's not necessary to wrap the arguments here with `eq()`, so this can be simplified.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
##########
@@ -72,11 +114,112 @@ public void endOperationIsInvokedOnDistributionError() {
     assertTrue(operation.endOperationInvoked);
   }
 
+  @Test
+  public void processReplacesVersionTagNullIDs() {
+    message.process(dm);
+
+    verify(versionTag).replaceNullIDs(sender);
+  }
+
+  @Test
+  public void processSendsReplyIfAdminDM() {
+    when(dm.getDMType()).thenReturn(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
+
+    message.process(dm);
+
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),

Review comment:
       It's not necessary to wrap the arguments here with `eq()`, so this can be simplified.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
##########
@@ -72,11 +114,112 @@ public void endOperationIsInvokedOnDistributionError() {
     assertTrue(operation.endOperationInvoked);
   }
 
+  @Test
+  public void processReplacesVersionTagNullIDs() {
+    message.process(dm);
+
+    verify(versionTag).replaceNullIDs(sender);
+  }
+
+  @Test
+  public void processSendsReplyIfAdminDM() {
+    when(dm.getDMType()).thenReturn(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
+
+    message.process(dm);
+
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfLocalRegionIsNull() {
+    doReturn(null).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfGotCacheClosedException() {
+    CacheClosedException cacheClosedException = new CacheClosedException();
+    doThrow(cacheClosedException).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();

Review comment:
       This would be simpler as `assertTrue()`.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
##########
@@ -72,11 +114,112 @@ public void endOperationIsInvokedOnDistributionError() {
     assertTrue(operation.endOperationInvoked);
   }
 
+  @Test
+  public void processReplacesVersionTagNullIDs() {
+    message.process(dm);
+
+    verify(versionTag).replaceNullIDs(sender);
+  }
+
+  @Test
+  public void processSendsReplyIfAdminDM() {
+    when(dm.getDMType()).thenReturn(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
+
+    message.process(dm);
+
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfLocalRegionIsNull() {
+    doReturn(null).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfGotCacheClosedException() {
+    CacheClosedException cacheClosedException = new CacheClosedException();
+    doThrow(cacheClosedException).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyExceptionIfGotRuntimeException() {
+    RuntimeException exception = new RuntimeException();
+    doThrow(exception).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    verify(message, never()).basicProcess(dm, region);
+    ArgumentCaptor<ReplyException> captor = ArgumentCaptor.forClass(ReplyException.class);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        captor.capture(),
+        eq(dm));
+    assertThat(captor.getValue().getCause()).isSameAs(exception);
+  }
+
+  @Test
+  public void processPerformsBasicProcessIfNotDistributedNoAck() {
+    when(scope.isDistributedNoAck()).thenReturn(false);
+
+    message.process(dm);
+
+    verify(message).basicProcess(dm, region);
+    verify(executors, never()).getWaitingThreadPool();
+  }
+
+  @Test
+  public void processUsesWaitingThreadPoolIfDistributedNoAck() {
+    when(scope.isDistributedNoAck()).thenReturn(true);
+
+    message.process(dm);
+
+    verify(executors).getWaitingThreadPool();
+  }
+
+  @Test
+  public void processDoesNotSendReplyIfDistributedNoAck() {
+    when(scope.isDistributedNoAck()).thenReturn(true);
+
+    message.process(dm);
+
+    verify(message, never()).sendReply(
+        eq(sender),

Review comment:
       It's not necessary to use `eq()` for these arguments.

##########
File path: geode-core/src/test/java/org/apache/geode/internal/cache/DistributedCacheOperationTest.java
##########
@@ -72,11 +114,112 @@ public void endOperationIsInvokedOnDistributionError() {
     assertTrue(operation.endOperationInvoked);
   }
 
+  @Test
+  public void processReplacesVersionTagNullIDs() {
+    message.process(dm);
+
+    verify(versionTag).replaceNullIDs(sender);
+  }
+
+  @Test
+  public void processSendsReplyIfAdminDM() {
+    when(dm.getDMType()).thenReturn(ClusterDistributionManager.ADMIN_ONLY_DM_TYPE);
+
+    message.process(dm);
+
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfLocalRegionIsNull() {
+    doReturn(null).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),
+        eq(processorId),
+        eq(null),
+        eq(dm));
+  }
+
+  @Test
+  public void processSendsReplyIfGotCacheClosedException() {
+    CacheClosedException cacheClosedException = new CacheClosedException();
+    doThrow(cacheClosedException).when(message).getLocalRegionForProcessing(dm);
+
+    message.process(dm);
+
+    assertThat(message.closed).isTrue();
+    verify(message, never()).basicProcess(dm, region);
+    verify(message).sendReply(
+        eq(sender),

Review comment:
       It's not necessary to use `eq()` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org