You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2017/10/05 23:45:40 UTC

[geode] 01/01: GEODE-3679 Forward client member id to other peers in transaction message.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch feature/GEODE-3679
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c34132f3f1cf99163ef4b0dcb1055fd0a73525ed
Author: eshu <es...@pivotal.io>
AuthorDate: Mon Oct 2 11:22:32 2017 -0700

    GEODE-3679 Forward client member id to other peers in transaction message.
    
      * Do not forward size request of a bucket region to transaction hosting node.
      * Move the method from LocalRegion to DistributedRegion.
      * Make region in TXRegionStub strong type to avoid some casting in the code.
---
 .../apache/geode/internal/cache/BucketRegion.java  |   6 +
 .../geode/internal/cache/DistributedRegion.java    |  27 ++++
 .../apache/geode/internal/cache/LocalRegion.java   |  74 ++++-----
 .../geode/internal/cache/PeerTXStateStub.java      |   4 +-
 .../cache/partitioned/PartitionMessage.java        |   7 +-
 .../cache/tx/AbstractPeerTXRegionStub.java         |   4 +-
 .../internal/cache/tx/DistributedTXRegionStub.java |  21 +--
 .../internal/cache/tx/PartitionedTXRegionStub.java |  31 ++--
 .../cache/ClientServerTransactionDUnitTest.java    | 167 +++++++++++++++++++++
 .../cache/tx/AbstractPeerTXRegionStubTest.java     |   2 +-
 10 files changed, 265 insertions(+), 78 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index c76e19e..cc84b9f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -1392,6 +1392,12 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   @Override
+  public int getRegionSize(DistributedMember target) {
+    // GEODE-3679. Do not forward the request again.
+    return getRegionSize();
+  }
+
+  @Override
   public void checkReadiness() {
     super.checkReadiness();
     if (isDestroyed()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index e882ed1..20d9f15 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -61,6 +61,8 @@ import org.apache.geode.cache.RegionMembershipListener;
 import org.apache.geode.cache.ResumptionAction;
 import org.apache.geode.cache.RoleException;
 import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.cache.TransactionDataNotColocatedException;
+import org.apache.geode.cache.TransactionException;
 import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionException;
@@ -101,6 +103,7 @@ import org.apache.geode.internal.cache.execute.LocalResultCollector;
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
 import org.apache.geode.internal.cache.lru.LRUEntry;
+import org.apache.geode.internal.cache.partitioned.RemoteSizeMessage;
 import org.apache.geode.internal.cache.persistence.CreatePersistentRegionProcessor;
 import org.apache.geode.internal.cache.persistence.PersistenceAdvisor;
 import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl;
@@ -3553,6 +3556,30 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
     return super.getOwnerForKey(key);
   }
 
+
+  /**
+   * Returns the size in this region.
+   *
+   * This is used in a transaction to find the size of the region on the transaction hosting node.
+   *
+   * @param target the host of the transaction TXState
+   * @return the number of entries in this region
+   */
+  public int getRegionSize(DistributedMember target) {
+    try {
+      RemoteSizeMessage.SizeResponse response =
+          RemoteSizeMessage.send(Collections.singleton(target), this);
+      return response.waitForSize();
+    } catch (RegionDestroyedException rde) {
+      throw new TransactionDataNotColocatedException(
+          LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION
+              .toLocalizedString(rde.getRegionFullPath()),
+          rde);
+    } catch (Exception e) {
+      throw new TransactionException(e);
+    }
+  }
+
   /**
    * Execute the provided named function in all locations that contain the given keys. So function
    * can be executed on just one fabric node, executed in parallel on a subset of nodes in parallel
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 47ce7e0..bc384c2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -17,6 +17,43 @@ package org.apache.geode.internal.cache;
 import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator;
 import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.AbstractSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.CancelException;
 import org.apache.geode.CopyHelper;
@@ -183,43 +220,6 @@ import org.apache.geode.pdx.JSONFormatter;
 import org.apache.geode.pdx.PdxInstance;
 import org.apache.logging.log4j.Logger;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.AbstractSet;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-
 /**
  * Implementation of a local scoped-region. Note that this class has a different meaning starting
  * with 3.0. In previous versions, a LocalRegion was the representation of a region in the VM.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
index 8e9ea89..80a2fc6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
@@ -169,14 +169,14 @@ public class PeerTXStateStub extends TXStateStub {
     TXRegionStub stub = null;
     if (region.getPartitionAttributes() != null) {
       // a partitioned region
-      stub = new PartitionedTXRegionStub(this, region);
+      stub = new PartitionedTXRegionStub(this, (PartitionedRegion) region);
     } else if (region.getScope().isLocal()) {
       // GEODE-3744 Local region should not be involved in a transaction on a PeerTXStateStub
       throw new TransactionException(
           "Local region " + region + " should not participate in a transaction not hosted locally");
     } else {
       // This is a dist region
-      stub = new DistributedTXRegionStub(this, region);
+      stub = new DistributedTXRegionStub(this, (DistributedRegion) region);
     }
     return stub;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
index 8c27107..64bebc4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
@@ -166,9 +166,10 @@ public abstract class PartitionMessage extends DistributionMessage
             "Sending remote txId even though transaction is local. This should never happen: txState="
                 + txState);
       }
-    }
-    if (txState != null && txState.isMemberIdForwardingRequired()) {
-      this.txMemberId = txState.getOriginatingMember();
+      // GEODE-3679. Even if TXStateProxy has a local transaction,
+      // we still need to forward original txMemberId to other nodes
+      // if the message does not start a new transaction.
+      this.txMemberId = txState.getTxId().getMemberId();
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java
index b4a2998..78273c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java
@@ -29,11 +29,9 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 public abstract class AbstractPeerTXRegionStub implements TXRegionStub {
 
   protected final TXStateStub state;
-  protected final LocalRegion region;
 
-  public AbstractPeerTXRegionStub(TXStateStub txstate, LocalRegion r) {
+  public AbstractPeerTXRegionStub(TXStateStub txstate) {
     this.state = txstate;
-    this.region = r;
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java
index 384135d..17bd83b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java
@@ -26,6 +26,7 @@ import org.apache.geode.cache.Region.Entry;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.DistributedPutAllOperation;
+import org.apache.geode.internal.cache.DistributedRegion;
 import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.KeyInfo;
@@ -45,17 +46,16 @@ import org.apache.geode.internal.cache.RemoteContainsKeyValueMessage.RemoteConta
 import org.apache.geode.internal.cache.RemoteOperationMessage.RemoteOperationResponse;
 import org.apache.geode.internal.cache.RemotePutMessage.PutResult;
 import org.apache.geode.internal.cache.RemotePutMessage.RemotePutResponse;
-import org.apache.geode.internal.cache.partitioned.RemoteSizeMessage;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
 
-  private final LocalRegion region;
+  private final DistributedRegion region;
 
-  public DistributedTXRegionStub(TXStateStub txstate, LocalRegion r) {
-    super(txstate, r);
+  public DistributedTXRegionStub(TXStateStub txstate, DistributedRegion r) {
+    super(txstate);
     this.region = r;
   }
 
@@ -224,18 +224,7 @@ public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
 
 
   public int entryCount() {
-    try {
-      RemoteSizeMessage.SizeResponse response =
-          RemoteSizeMessage.send(Collections.singleton(state.getTarget()), region);
-      return response.waitForSize();
-    } catch (RegionDestroyedException rde) {
-      throw new TransactionDataNotColocatedException(
-          LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION
-              .toLocalizedString(rde.getRegionFullPath()),
-          rde);
-    } catch (Exception e) {
-      throw new TransactionException(e);
-    }
+    return this.region.getRegionSize(this.state.getTarget());
   }
 
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
index 10ae7a5..5673c68 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
@@ -63,8 +63,11 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
    */
   private Map<Integer, Boolean> buckets = new HashMap<Integer, Boolean>();
 
-  public PartitionedTXRegionStub(TXStateStub txstate, LocalRegion r) {
-    super(txstate, r);
+  private final PartitionedRegion region;
+
+  public PartitionedTXRegionStub(TXStateStub txstate, PartitionedRegion r) {
+    super(txstate);
+    this.region = r;
   }
 
   public Map<Integer, Boolean> getBuckets() {
@@ -136,7 +139,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
   private boolean isKeyInNonColocatedBucket(KeyInfo keyInfo) {
     Map<Region<?, ?>, TXRegionStub> regionStubs = this.state.getRegionStubs();
     Collection<PartitionedRegion> colcatedRegions = (Collection<PartitionedRegion>) ColocationHelper
-        .getAllColocationRegions((PartitionedRegion) this.region).values();
+        .getAllColocationRegions(this.region).values();
     // get all colocated region buckets touched in the transaction
     for (PartitionedRegion colcatedRegion : colcatedRegions) {
       PartitionedTXRegionStub regionStub =
@@ -160,9 +163,8 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
 
 
   public Entry getEntry(KeyInfo keyInfo, boolean allowTombstones) {
-    PartitionedRegion pr = (PartitionedRegion) region;
     try {
-      Entry e = pr.getEntryRemotely((InternalDistributedMember) state.getTarget(),
+      Entry e = region.getEntryRemotely((InternalDistributedMember) state.getTarget(),
           keyInfo.getBucketId(), keyInfo.getKey(), false, allowTombstones);
       trackBucketForTx(keyInfo);
       return e;
@@ -238,9 +240,8 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
 
 
   public boolean containsKey(KeyInfo keyInfo) {
-    PartitionedRegion pr = (PartitionedRegion) region;
     try {
-      boolean retVal = pr.containsKeyRemotely((InternalDistributedMember) state.getTarget(),
+      boolean retVal = region.containsKeyRemotely((InternalDistributedMember) state.getTarget(),
           keyInfo.getBucketId(), keyInfo.getKey());
       trackBucketForTx(keyInfo);
       return retVal;
@@ -282,10 +283,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
 
 
   public boolean containsValueForKey(KeyInfo keyInfo) {
-    PartitionedRegion pr = (PartitionedRegion) region;
     try {
-      boolean retVal = pr.containsValueForKeyRemotely((InternalDistributedMember) state.getTarget(),
-          keyInfo.getBucketId(), keyInfo.getKey());
+      boolean retVal = region.containsValueForKeyRemotely(
+          (InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey());
       trackBucketForTx(keyInfo);
       return retVal;
     } catch (TransactionException e) {
@@ -318,10 +318,10 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
     Object retVal = null;
     final Object key = keyInfo.getKey();
     final Object callbackArgument = keyInfo.getCallbackArg();
-    PartitionedRegion pr = (PartitionedRegion) region;
     try {
-      retVal = pr.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(),
-          key, callbackArgument, peferCD, requestingClient, clientEvent, false);
+      retVal =
+          region.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(),
+              key, callbackArgument, peferCD, requestingClient, clientEvent, false);
     } catch (TransactionException e) {
       RuntimeException re = getTransactionException(keyInfo, e);
       re.initCause(e.getCause());
@@ -347,12 +347,11 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
 
 
   public Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstones) {
-    PartitionedRegion pr = (PartitionedRegion) region;
-    InternalDistributedMember primary = pr.getBucketPrimary(keyInfo.getBucketId());
+    InternalDistributedMember primary = region.getBucketPrimary(keyInfo.getBucketId());
     if (primary.equals(state.getTarget())) {
       return getEntry(keyInfo, allowTombstones);
     } else {
-      return pr.getSharedDataView().getEntry(keyInfo, pr, allowTombstones);
+      return region.getSharedDataView().getEntry(keyInfo, region, allowTombstones);
     }
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index 96b89b9..c7ae750 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -4065,4 +4065,171 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
       Assert.fail("Unexpected exception while doing JTA Transaction2 ", e);
     }
   }
+
+  @Test
+  public void testPartitionMessageSetsClientMemberIdAsTxMemberId() {
+    Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM client = host.getVM(2);
+    int totalBuckets = 50;
+    String regionName = "region";
+
+    setupRegionForClientTransactions(totalBuckets, regionName, false, null);
+
+    client.invokeAsync(() -> doKeySetOpTransaction(1, regionName, totalBuckets, false, null));
+
+    // Should cause TXId(server1, 1) to be executed on server2
+    server1.invoke(() -> doPutOpTransaction(regionName, totalBuckets));
+  }
+
+  private void doKeySetOpTransaction(int firstGetKey, String regionName, int totalBuckets,
+      boolean withReplicateRegion, String region2Name) {
+    Region<Integer, String> region = getCache().getRegion(regionName);
+    TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+    txMgr.begin();
+    region.get(firstGetKey); // starts TXState on a server with the primary bucket of the key
+    verifyKeySetOp(totalBuckets, region);
+
+    if (withReplicateRegion) {
+      Region<Integer, String> region2 = getCache().getRegion(region2Name);
+      int num = totalBuckets + 1;
+      region2.put(num, "" + num);
+      verifyKeySetOp(num, region2);
+    }
+    txMgr.rollback();
+  }
+
+  private void verifyKeySetOp(int expected, Region<Integer, String> region) {
+    Set<Integer> keys = region.keySet();
+    assertEquals(expected, keys.size());
+    for (Integer key : keys) {
+      assertTrue(key <= expected);
+    }
+  }
+
+  private void doPutOpTransaction(String regionName, int totalBuckets) throws InterruptedException {
+    TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+    Region<Integer, String> region = getCache().getRegion(regionName);
+    txMgr.begin();
+    region.put(2, "NEWVALUE");
+    Thread.currentThread().sleep(100);
+    txMgr.commit();
+  }
+
+  private void doSizeOpTransactions(String regionName, int totalBuckets, String region2Name) {
+    for (int i = 1; i <= totalBuckets; i++) {
+      doSizeOpTransaction(i, regionName, totalBuckets, region2Name);
+    }
+  }
+
+  private void doSizeOpTransaction(int key, String regionName, int totalBuckets,
+      String region2Name) {
+    Region<Integer, String> region = getCache().getRegion(regionName);
+    Region<Integer, String> region2 = getCache().getRegion(region2Name);
+    TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+    txMgr.begin();
+    region.get(key); // starts TXState on different servers
+    assertEquals(totalBuckets, region.size());
+    int num = totalBuckets + 1;
+    region2.put(num, "" + num);
+    assertEquals(num, region2.size());
+    txMgr.rollback();
+  }
+
+  @Test
+  public void testSizeOpInTransaction() {
+    Host host = Host.getHost(0);
+    VM client = host.getVM(2);
+    String regionName = "region";
+    String region2Name = "region2";
+    int totalBuckets = 2;
+    setupRegionForClientTransactions(totalBuckets, regionName, true, region2Name);
+
+    client.invoke(() -> doSizeOpTransactions(regionName, totalBuckets, region2Name));
+  }
+
+  private void setupRegionForClientTransactions(int totalBuckets, String regionName,
+      boolean withReplicateRegion, String region2Name) {
+    Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    int port = createRegionsAndStartServer(server1, true);
+
+    createPRAndInitABucketOnServer1(totalBuckets, regionName, server1);
+
+    createPRAndInitOtherBucketsOnServer2(totalBuckets, regionName, server2);
+
+    if (withReplicateRegion) {
+      initReplicateRegion(totalBuckets, region2Name, server1, server2);
+    }
+
+    createRegionOnClient(regionName, withReplicateRegion, region2Name, client, port);
+  }
+
+  private void createRegionOnClient(String regionName, boolean withReplicateRegion,
+      String region2Name, VM client, int port) {
+    client.invoke(() -> {
+      createClient(port, regionName);
+      if (withReplicateRegion) {
+        createClient(port, region2Name);
+      }
+    });
+  }
+
+  private void initReplicateRegion(int totalBuckets, String region2Name, VM server1, VM server2) {
+    server1.invoke(() -> createReplicateRegion(region2Name));
+    server2.invoke(() -> {
+      createReplicateRegion(region2Name);
+      Region<Integer, String> region = getCache().getRegion(region2Name);
+      for (int i = totalBuckets; i > 0; i--) {
+        region.put(i, "" + i);
+      }
+    });
+  }
+
+  private void createPRAndInitOtherBucketsOnServer2(int totalBuckets, String regionName,
+      VM server2) {
+    createRegionOnServer(server2);
+    server2.invoke(() -> {
+      createSubscriptionRegion(false, regionName, 0, totalBuckets);
+      Region<Integer, String> region = getCache().getRegion(regionName);
+      for (int i = totalBuckets; i > 1; i--) {
+        region.put(i, "VALUE-" + i);
+      }
+    });
+  }
+
+  private void createPRAndInitABucketOnServer1(int totalBuckets, String regionName, VM server1) {
+    server1.invoke(() -> {
+      createSubscriptionRegion(false, regionName, 0, totalBuckets);
+      Region<Integer, String> region = getCache().getRegion(regionName);
+      // should create first bucket on server1
+      region.put(1, "VALUE-1");
+    });
+  }
+
+  @Test
+  public void testKeySetOpInTransaction() {
+    Host host = Host.getHost(0);
+    VM client = host.getVM(2);
+    String regionName = "region";
+    String region2Name = "region2";
+    int totalBuckets = 2;
+    setupRegionForClientTransactions(totalBuckets, regionName, true, region2Name);
+
+    client.invoke(() -> doKeySetOpTransactions(regionName, totalBuckets, true, region2Name));
+  }
+
+  private void doKeySetOpTransactions(String regionName, int totalBuckets,
+      boolean withReplicateRegion, String region2Name) {
+    for (int i = 1; i <= totalBuckets; i++) {
+      doKeySetOpTransaction(i, regionName, totalBuckets, withReplicateRegion, region2Name);
+    }
+  }
+
+  private void createReplicateRegion(String regionName) {
+    RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
+    Region<Integer, String> region = rf.create(regionName);
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java
index 80888ef..a7240bb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java
@@ -51,7 +51,7 @@ public class AbstractPeerTXRegionStubTest {
   private class TestingAbstractPeerTXRegionStub extends AbstractPeerTXRegionStub {
 
     private TestingAbstractPeerTXRegionStub(TXStateStub txState, LocalRegion r) {
-      super(txState, r);
+      super(txState);
     }
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.