You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/02/08 17:48:11 UTC

[geode] branch feature/GEODE-6379 updated: GEODE-6379: track departed members to avoid processing in-flight lock request

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-6379
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6379 by this push:
     new 9808685  GEODE-6379: track departed members to avoid processing in-flight lock request
9808685 is described below

commit 9808685798bfb413597338f3c9491edfb64348fb
Author: eshu <es...@pivotal.io>
AuthorDate: Fri Feb 8 09:46:08 2019 -0800

    GEODE-6379: track departed members to avoid processing in-flight lock request
---
 .../distributed/internal/locks/DLockGrantor.java   | 38 ++++++++-
 .../internal/locks/DLockGrantorTest.java           | 90 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
index 1ac754f..1b848d6 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
@@ -32,7 +32,9 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.LockServiceDestroyedException;
 import org.apache.geode.distributed.internal.DistributionConfig;
@@ -146,6 +148,10 @@ public class DLockGrantor {
    */
   private final TXReservationMgr resMgr = new TXReservationMgr(false);
 
+  private final Map<InternalDistributedMember, Long> membersDepartedTime = new HashMap();
+  private final int membersDepartedTimeKeptSize = 10;
+  private final long departedMemberKeptInMapMilliSeconds = 24 * 60 * 60 * 1000;
+
   /**
    * Enforces waiting until this grantor is initialized. Used to block all lock requests until
    * INITIALIZED.
@@ -498,7 +504,13 @@ public class DLockGrantor {
         }
 
         DLockBatch batch = (DLockBatch) request.getObjectName();
-        this.resMgr.makeReservation((IdentityArrayList) batch.getReqs());
+        synchronized (membersDepartedTime) {
+          // the transaction host/txLock requester has departed.
+          if (membersDepartedTime.containsKey(batch.getOwner())) {
+            throw new TransactionDataNodeHasDepartedException("Transaction host has been departed");
+          }
+          resMgr.makeReservation((IdentityArrayList) batch.getReqs());
+        }
         if (isTraceEnabled_DLS) {
           logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] granting {}",
               batch.getBatchId());
@@ -522,6 +534,10 @@ public class DLockGrantor {
    * @return lock batches that were created by owner
    */
   public DLockBatch[] getLockBatches(InternalDistributedMember owner) {
+    // put owner into the map first so that no new threads will handle in-flight requests
+    // from the departed member to lock keys
+    recordMemberDepartedTime(owner);
+
     // Key: Object batchId, Value: DLockBatch batch
     synchronized (this.batchLocks) {
       List batchList = new ArrayList();
@@ -535,6 +551,26 @@ public class DLockGrantor {
     }
   }
 
+  void recordMemberDepartedTime(InternalDistributedMember owner) {
+    synchronized (membersDepartedTime) {
+      if (membersDepartedTime.size() >= membersDepartedTimeKeptSize) {
+        // remove all departed members kept in map longer than 1 day
+        membersDepartedTime.entrySet()
+            .removeIf(entries -> entries.getValue() < departedMembersToBeExpiredTime());
+      }
+      membersDepartedTime.put(owner, System.currentTimeMillis());
+    }
+  }
+
+  long departedMembersToBeExpiredTime() {
+    return System.currentTimeMillis() - departedMemberKeptInMapMilliSeconds;
+  }
+
+  @VisibleForTesting
+  Map getMembersDepartedTimeRecords() {
+    return membersDepartedTime;
+  }
+
   /**
    * Get the batch for the given batchId (for example use a txLockId from TXLockBatch in order to
    * update its participants). This operation was added as part of the solution to bug 32999.
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
new file mode 100644
index 0000000..5b0fa23
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.locks;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+public class DLockGrantorTest {
+  private DLockService dLockService;
+  private DistributionManager distributionManager;
+  private DLockGrantor grantor;
+
+  @Before
+  public void setup() {
+    dLockService = mock(DLockService.class, RETURNS_DEEP_STUBS);
+    distributionManager = mock(DistributionManager.class);
+    when(dLockService.getDistributionManager()).thenReturn(distributionManager);
+    CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+    when(distributionManager.getCancelCriterion()).thenReturn(cancelCriterion);
+    grantor = DLockGrantor.createGrantor(dLockService, 1);
+  }
+
+  @Test
+  public void handleLockBatchThrowsIfRequesterHasDeparted() {
+    DLockLessorDepartureHandler handler = mock(DLockLessorDepartureHandler.class);
+    InternalDistributedMember requester = mock(InternalDistributedMember.class);
+    DLockRequestProcessor.DLockRequestMessage requestMessage =
+        mock(DLockRequestProcessor.DLockRequestMessage.class);
+    when(dLockService.getDLockLessorDepartureHandler()).thenReturn(handler);
+    DLockBatch lockBatch = mock(DLockBatch.class);
+    when(requestMessage.getObjectName()).thenReturn(lockBatch);
+    when(lockBatch.getOwner()).thenReturn(requester);
+
+    grantor.makeReady(true);
+    grantor.getLockBatches(requester);
+
+    assertThatThrownBy(() -> grantor.handleLockBatch(requestMessage)).isInstanceOf(
+        TransactionDataNodeHasDepartedException.class);
+  }
+
+  @Test
+  public void recordMemberDepartedTimeRecords() {
+    InternalDistributedMember owner = mock(InternalDistributedMember.class);
+    grantor.recordMemberDepartedTime(owner);
+
+    assertThat(grantor.getMembersDepartedTimeRecords()).containsKey(owner);
+  }
+
+  @Test
+  public void recordMemberDepartedTimeRemovesExpiredMembers() {
+    DLockGrantor spy = spy(grantor);
+    for (int i = 0; i < 10; i++) {
+      spy.recordMemberDepartedTime(mock(InternalDistributedMember.class));
+    }
+    assertThat(spy.getMembersDepartedTimeRecords().size()).isEqualTo(10);
+
+    doReturn(System.currentTimeMillis() + 1).when(spy).departedMembersToBeExpiredTime();
+    InternalDistributedMember owner = mock(InternalDistributedMember.class);
+    spy.recordMemberDepartedTime(owner);
+
+    assertThat(spy.getMembersDepartedTimeRecords().size()).isEqualTo(1);
+    assertThat(spy.getMembersDepartedTimeRecords()).containsKey(owner);
+  }
+
+}