You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2019/02/08 17:48:11 UTC
[geode] branch feature/GEODE-6379 updated: GEODE-6379: track
departed members to avoid processing in-flight lock request
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-6379
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6379 by this push:
new 9808685 GEODE-6379: track departed members to avoid processing in-flight lock request
9808685 is described below
commit 9808685798bfb413597338f3c9491edfb64348fb
Author: eshu <es...@pivotal.io>
AuthorDate: Fri Feb 8 09:46:08 2019 -0800
GEODE-6379: track departed members to avoid processing in-flight lock request
---
.../distributed/internal/locks/DLockGrantor.java | 38 ++++++++-
.../internal/locks/DLockGrantorTest.java | 90 ++++++++++++++++++++++
2 files changed, 127 insertions(+), 1 deletion(-)
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
index 1ac754f..1b848d6 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/DLockGrantor.java
@@ -32,7 +32,9 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.internal.DistributionConfig;
@@ -146,6 +148,10 @@ public class DLockGrantor {
*/
private final TXReservationMgr resMgr = new TXReservationMgr(false);
+ private final Map<InternalDistributedMember, Long> membersDepartedTime = new HashMap();
+ private final int membersDepartedTimeKeptSize = 10;
+ private final long departedMemberKeptInMapMilliSeconds = 24 * 60 * 60 * 1000;
+
/**
* Enforces waiting until this grantor is initialized. Used to block all lock requests until
* INITIALIZED.
@@ -498,7 +504,13 @@ public class DLockGrantor {
}
DLockBatch batch = (DLockBatch) request.getObjectName();
- this.resMgr.makeReservation((IdentityArrayList) batch.getReqs());
+ synchronized (membersDepartedTime) {
+ // the transaction host/txLock requester has departed.
+ if (membersDepartedTime.containsKey(batch.getOwner())) {
+ throw new TransactionDataNodeHasDepartedException("Transaction host has been departed");
+ }
+ resMgr.makeReservation((IdentityArrayList) batch.getReqs());
+ }
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] granting {}",
batch.getBatchId());
@@ -522,6 +534,10 @@ public class DLockGrantor {
* @return lock batches that were created by owner
*/
public DLockBatch[] getLockBatches(InternalDistributedMember owner) {
+ // put owner into the map first so that no new threads will handle in-flight requests
+ // from the departed member to lock keys
+ recordMemberDepartedTime(owner);
+
// Key: Object batchId, Value: DLockBatch batch
synchronized (this.batchLocks) {
List batchList = new ArrayList();
@@ -535,6 +551,26 @@ public class DLockGrantor {
}
}
+ void recordMemberDepartedTime(InternalDistributedMember owner) {
+ synchronized (membersDepartedTime) {
+ if (membersDepartedTime.size() >= membersDepartedTimeKeptSize) {
+ // remove all departed members kept in map longer than 1 day
+ membersDepartedTime.entrySet()
+ .removeIf(entries -> entries.getValue() < departedMembersToBeExpiredTime());
+ }
+ membersDepartedTime.put(owner, System.currentTimeMillis());
+ }
+ }
+
+ long departedMembersToBeExpiredTime() {
+ return System.currentTimeMillis() - departedMemberKeptInMapMilliSeconds;
+ }
+
+ @VisibleForTesting
+ Map getMembersDepartedTimeRecords() {
+ return membersDepartedTime;
+ }
+
/**
* Get the batch for the given batchId (for example use a txLockId from TXLockBatch in order to
* update its participants). This operation was added as part of the solution to bug 32999.
diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
new file mode 100644
index 0000000..5b0fa23
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/locks/DLockGrantorTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.locks;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.TransactionDataNodeHasDepartedException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+
+public class DLockGrantorTest {
+ private DLockService dLockService;
+ private DistributionManager distributionManager;
+ private DLockGrantor grantor;
+
+ @Before
+ public void setup() {
+ dLockService = mock(DLockService.class, RETURNS_DEEP_STUBS);
+ distributionManager = mock(DistributionManager.class);
+ when(dLockService.getDistributionManager()).thenReturn(distributionManager);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ when(distributionManager.getCancelCriterion()).thenReturn(cancelCriterion);
+ grantor = DLockGrantor.createGrantor(dLockService, 1);
+ }
+
+ @Test
+ public void handleLockBatchThrowsIfRequesterHasDeparted() {
+ DLockLessorDepartureHandler handler = mock(DLockLessorDepartureHandler.class);
+ InternalDistributedMember requester = mock(InternalDistributedMember.class);
+ DLockRequestProcessor.DLockRequestMessage requestMessage =
+ mock(DLockRequestProcessor.DLockRequestMessage.class);
+ when(dLockService.getDLockLessorDepartureHandler()).thenReturn(handler);
+ DLockBatch lockBatch = mock(DLockBatch.class);
+ when(requestMessage.getObjectName()).thenReturn(lockBatch);
+ when(lockBatch.getOwner()).thenReturn(requester);
+
+ grantor.makeReady(true);
+ grantor.getLockBatches(requester);
+
+ assertThatThrownBy(() -> grantor.handleLockBatch(requestMessage)).isInstanceOf(
+ TransactionDataNodeHasDepartedException.class);
+ }
+
+ @Test
+ public void recordMemberDepartedTimeRecords() {
+ InternalDistributedMember owner = mock(InternalDistributedMember.class);
+ grantor.recordMemberDepartedTime(owner);
+
+ assertThat(grantor.getMembersDepartedTimeRecords()).containsKey(owner);
+ }
+
+ @Test
+ public void recordMemberDepartedTimeRemovesExpiredMembers() {
+ DLockGrantor spy = spy(grantor);
+ for (int i = 0; i < 10; i++) {
+ spy.recordMemberDepartedTime(mock(InternalDistributedMember.class));
+ }
+ assertThat(spy.getMembersDepartedTimeRecords().size()).isEqualTo(10);
+
+ doReturn(System.currentTimeMillis() + 1).when(spy).departedMembersToBeExpiredTime();
+ InternalDistributedMember owner = mock(InternalDistributedMember.class);
+ spy.recordMemberDepartedTime(owner);
+
+ assertThat(spy.getMembersDepartedTimeRecords().size()).isEqualTo(1);
+ assertThat(spy.getMembersDepartedTimeRecords()).containsKey(owner);
+ }
+
+}