You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2017/10/05 23:45:40 UTC
[geode] 01/01: GEODE-3679 Forward client member id to other peers
in transaction message.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-3679
in repository https://gitbox.apache.org/repos/asf/geode.git
commit c34132f3f1cf99163ef4b0dcb1055fd0a73525ed
Author: eshu <es...@pivotal.io>
AuthorDate: Mon Oct 2 11:22:32 2017 -0700
GEODE-3679 Forward client member id to other peers in transaction message.
* Do not forward size request of a bucket region to transaction hosting node.
* Move the method from LocalRegion to DistributedRegion.
* Make region in TXRegionStub strong type to avoid some casting in the code.
---
.../apache/geode/internal/cache/BucketRegion.java | 6 +
.../geode/internal/cache/DistributedRegion.java | 27 ++++
.../apache/geode/internal/cache/LocalRegion.java | 74 ++++-----
.../geode/internal/cache/PeerTXStateStub.java | 4 +-
.../cache/partitioned/PartitionMessage.java | 7 +-
.../cache/tx/AbstractPeerTXRegionStub.java | 4 +-
.../internal/cache/tx/DistributedTXRegionStub.java | 21 +--
.../internal/cache/tx/PartitionedTXRegionStub.java | 31 ++--
.../cache/ClientServerTransactionDUnitTest.java | 167 +++++++++++++++++++++
.../cache/tx/AbstractPeerTXRegionStubTest.java | 2 +-
10 files changed, 265 insertions(+), 78 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index c76e19e..cc84b9f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -1392,6 +1392,12 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
@Override
+ public int getRegionSize(DistributedMember target) {
+ // GEODE-3679. Do not forward the request again.
+ return getRegionSize();
+ }
+
+ @Override
public void checkReadiness() {
super.checkReadiness();
if (isDestroyed()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index e882ed1..20d9f15 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -61,6 +61,8 @@ import org.apache.geode.cache.RegionMembershipListener;
import org.apache.geode.cache.ResumptionAction;
import org.apache.geode.cache.RoleException;
import org.apache.geode.cache.TimeoutException;
+import org.apache.geode.cache.TransactionDataNotColocatedException;
+import org.apache.geode.cache.TransactionException;
import org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueImpl;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
@@ -101,6 +103,7 @@ import org.apache.geode.internal.cache.execute.LocalResultCollector;
import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
import org.apache.geode.internal.cache.lru.LRUEntry;
+import org.apache.geode.internal.cache.partitioned.RemoteSizeMessage;
import org.apache.geode.internal.cache.persistence.CreatePersistentRegionProcessor;
import org.apache.geode.internal.cache.persistence.PersistenceAdvisor;
import org.apache.geode.internal.cache.persistence.PersistenceAdvisorImpl;
@@ -3553,6 +3556,30 @@ public class DistributedRegion extends LocalRegion implements CacheDistributionA
return super.getOwnerForKey(key);
}
+
+ /**
+ * Returns the size in this region.
+ *
+ * This is used in a transaction to find the size of the region on the transaction hosting node.
+ *
+ * @param target the host of the transaction TXState
+ * @return the number of entries in this region
+ */
+ public int getRegionSize(DistributedMember target) {
+ try {
+ RemoteSizeMessage.SizeResponse response =
+ RemoteSizeMessage.send(Collections.singleton(target), this);
+ return response.waitForSize();
+ } catch (RegionDestroyedException rde) {
+ throw new TransactionDataNotColocatedException(
+ LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION
+ .toLocalizedString(rde.getRegionFullPath()),
+ rde);
+ } catch (Exception e) {
+ throw new TransactionException(e);
+ }
+ }
+
/**
* Execute the provided named function in all locations that contain the given keys. So function
* can be executed on just one fabric node, executed in parallel on a subset of nodes in parallel
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 47ce7e0..bc384c2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -17,6 +17,43 @@ package org.apache.geode.internal.cache;
import static org.apache.geode.internal.lang.SystemUtils.getLineSeparator;
import static org.apache.geode.internal.offheap.annotations.OffHeapIdentifier.ENTRY_EVENT_NEW_VALUE;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.AbstractSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.transaction.RollbackException;
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.CopyHelper;
@@ -183,43 +220,6 @@ import org.apache.geode.pdx.JSONFormatter;
import org.apache.geode.pdx.PdxInstance;
import org.apache.logging.log4j.Logger;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.AbstractSet;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-
/**
* Implementation of a local scoped-region. Note that this class has a different meaning starting
* with 3.0. In previous versions, a LocalRegion was the representation of a region in the VM.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
index 8e9ea89..80a2fc6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PeerTXStateStub.java
@@ -169,14 +169,14 @@ public class PeerTXStateStub extends TXStateStub {
TXRegionStub stub = null;
if (region.getPartitionAttributes() != null) {
// a partitioned region
- stub = new PartitionedTXRegionStub(this, region);
+ stub = new PartitionedTXRegionStub(this, (PartitionedRegion) region);
} else if (region.getScope().isLocal()) {
// GEODE-3744 Local region should not be involved in a transaction on a PeerTXStateStub
throw new TransactionException(
"Local region " + region + " should not participate in a transaction not hosted locally");
} else {
// This is a dist region
- stub = new DistributedTXRegionStub(this, region);
+ stub = new DistributedTXRegionStub(this, (DistributedRegion) region);
}
return stub;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
index 8c27107..64bebc4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
@@ -166,9 +166,10 @@ public abstract class PartitionMessage extends DistributionMessage
"Sending remote txId even though transaction is local. This should never happen: txState="
+ txState);
}
- }
- if (txState != null && txState.isMemberIdForwardingRequired()) {
- this.txMemberId = txState.getOriginatingMember();
+ // GEODE-3679. Even if TXStateProxy has a local transaction,
+ // we still need to forward original txMemberId to other nodes
+ // if the message does not start a new transaction.
+ this.txMemberId = txState.getTxId().getMemberId();
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java
index b4a2998..78273c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStub.java
@@ -29,11 +29,9 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
public abstract class AbstractPeerTXRegionStub implements TXRegionStub {
protected final TXStateStub state;
- protected final LocalRegion region;
- public AbstractPeerTXRegionStub(TXStateStub txstate, LocalRegion r) {
+ public AbstractPeerTXRegionStub(TXStateStub txstate) {
this.state = txstate;
- this.region = r;
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java
index 384135d..17bd83b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/DistributedTXRegionStub.java
@@ -26,6 +26,7 @@ import org.apache.geode.cache.Region.Entry;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.DistributedPutAllOperation;
+import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.KeyInfo;
@@ -45,17 +46,16 @@ import org.apache.geode.internal.cache.RemoteContainsKeyValueMessage.RemoteConta
import org.apache.geode.internal.cache.RemoteOperationMessage.RemoteOperationResponse;
import org.apache.geode.internal.cache.RemotePutMessage.PutResult;
import org.apache.geode.internal.cache.RemotePutMessage.RemotePutResponse;
-import org.apache.geode.internal.cache.partitioned.RemoteSizeMessage;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.i18n.LocalizedStrings;
public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
- private final LocalRegion region;
+ private final DistributedRegion region;
- public DistributedTXRegionStub(TXStateStub txstate, LocalRegion r) {
- super(txstate, r);
+ public DistributedTXRegionStub(TXStateStub txstate, DistributedRegion r) {
+ super(txstate);
this.region = r;
}
@@ -224,18 +224,7 @@ public class DistributedTXRegionStub extends AbstractPeerTXRegionStub {
public int entryCount() {
- try {
- RemoteSizeMessage.SizeResponse response =
- RemoteSizeMessage.send(Collections.singleton(state.getTarget()), region);
- return response.waitForSize();
- } catch (RegionDestroyedException rde) {
- throw new TransactionDataNotColocatedException(
- LocalizedStrings.RemoteMessage_REGION_0_NOT_COLOCATED_WITH_TRANSACTION
- .toLocalizedString(rde.getRegionFullPath()),
- rde);
- } catch (Exception e) {
- throw new TransactionException(e);
- }
+ return this.region.getRegionSize(this.state.getTarget());
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
index 10ae7a5..5673c68 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tx/PartitionedTXRegionStub.java
@@ -63,8 +63,11 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
*/
private Map<Integer, Boolean> buckets = new HashMap<Integer, Boolean>();
- public PartitionedTXRegionStub(TXStateStub txstate, LocalRegion r) {
- super(txstate, r);
+ private final PartitionedRegion region;
+
+ public PartitionedTXRegionStub(TXStateStub txstate, PartitionedRegion r) {
+ super(txstate);
+ this.region = r;
}
public Map<Integer, Boolean> getBuckets() {
@@ -136,7 +139,7 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
private boolean isKeyInNonColocatedBucket(KeyInfo keyInfo) {
Map<Region<?, ?>, TXRegionStub> regionStubs = this.state.getRegionStubs();
Collection<PartitionedRegion> colcatedRegions = (Collection<PartitionedRegion>) ColocationHelper
- .getAllColocationRegions((PartitionedRegion) this.region).values();
+ .getAllColocationRegions(this.region).values();
// get all colocated region buckets touched in the transaction
for (PartitionedRegion colcatedRegion : colcatedRegions) {
PartitionedTXRegionStub regionStub =
@@ -160,9 +163,8 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
public Entry getEntry(KeyInfo keyInfo, boolean allowTombstones) {
- PartitionedRegion pr = (PartitionedRegion) region;
try {
- Entry e = pr.getEntryRemotely((InternalDistributedMember) state.getTarget(),
+ Entry e = region.getEntryRemotely((InternalDistributedMember) state.getTarget(),
keyInfo.getBucketId(), keyInfo.getKey(), false, allowTombstones);
trackBucketForTx(keyInfo);
return e;
@@ -238,9 +240,8 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
public boolean containsKey(KeyInfo keyInfo) {
- PartitionedRegion pr = (PartitionedRegion) region;
try {
- boolean retVal = pr.containsKeyRemotely((InternalDistributedMember) state.getTarget(),
+ boolean retVal = region.containsKeyRemotely((InternalDistributedMember) state.getTarget(),
keyInfo.getBucketId(), keyInfo.getKey());
trackBucketForTx(keyInfo);
return retVal;
@@ -282,10 +283,9 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
public boolean containsValueForKey(KeyInfo keyInfo) {
- PartitionedRegion pr = (PartitionedRegion) region;
try {
- boolean retVal = pr.containsValueForKeyRemotely((InternalDistributedMember) state.getTarget(),
- keyInfo.getBucketId(), keyInfo.getKey());
+ boolean retVal = region.containsValueForKeyRemotely(
+ (InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(), keyInfo.getKey());
trackBucketForTx(keyInfo);
return retVal;
} catch (TransactionException e) {
@@ -318,10 +318,10 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
Object retVal = null;
final Object key = keyInfo.getKey();
final Object callbackArgument = keyInfo.getCallbackArg();
- PartitionedRegion pr = (PartitionedRegion) region;
try {
- retVal = pr.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(),
- key, callbackArgument, peferCD, requestingClient, clientEvent, false);
+ retVal =
+ region.getRemotely((InternalDistributedMember) state.getTarget(), keyInfo.getBucketId(),
+ key, callbackArgument, peferCD, requestingClient, clientEvent, false);
} catch (TransactionException e) {
RuntimeException re = getTransactionException(keyInfo, e);
re.initCause(e.getCause());
@@ -347,12 +347,11 @@ public class PartitionedTXRegionStub extends AbstractPeerTXRegionStub {
public Object getEntryForIterator(KeyInfo keyInfo, boolean allowTombstones) {
- PartitionedRegion pr = (PartitionedRegion) region;
- InternalDistributedMember primary = pr.getBucketPrimary(keyInfo.getBucketId());
+ InternalDistributedMember primary = region.getBucketPrimary(keyInfo.getBucketId());
if (primary.equals(state.getTarget())) {
return getEntry(keyInfo, allowTombstones);
} else {
- return pr.getSharedDataView().getEntry(keyInfo, pr, allowTombstones);
+ return region.getSharedDataView().getEntry(keyInfo, region, allowTombstones);
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
index 96b89b9..c7ae750 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ClientServerTransactionDUnitTest.java
@@ -4065,4 +4065,171 @@ public class ClientServerTransactionDUnitTest extends RemoteTransactionDUnitTest
Assert.fail("Unexpected exception while doing JTA Transaction2 ", e);
}
}
+
+ @Test
+ public void testPartitionMessageSetsClientMemberIdAsTxMemberId() {
+ Host host = Host.getHost(0);
+ VM server1 = host.getVM(0);
+ VM client = host.getVM(2);
+ int totalBuckets = 50;
+ String regionName = "region";
+
+ setupRegionForClientTransactions(totalBuckets, regionName, false, null);
+
+ client.invokeAsync(() -> doKeySetOpTransaction(1, regionName, totalBuckets, false, null));
+
+ // Should cause TXId(server1, 1) to be executed on server2
+ server1.invoke(() -> doPutOpTransaction(regionName, totalBuckets));
+ }
+
+ private void doKeySetOpTransaction(int firstGetKey, String regionName, int totalBuckets,
+ boolean withReplicateRegion, String region2Name) {
+ Region<Integer, String> region = getCache().getRegion(regionName);
+ TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+ txMgr.begin();
+ region.get(firstGetKey); // starts TXState on a server with the primary bucket of the key
+ verifyKeySetOp(totalBuckets, region);
+
+ if (withReplicateRegion) {
+ Region<Integer, String> region2 = getCache().getRegion(region2Name);
+ int num = totalBuckets + 1;
+ region2.put(num, "" + num);
+ verifyKeySetOp(num, region2);
+ }
+ txMgr.rollback();
+ }
+
+ private void verifyKeySetOp(int expected, Region<Integer, String> region) {
+ Set<Integer> keys = region.keySet();
+ assertEquals(expected, keys.size());
+ for (Integer key : keys) {
+ assertTrue(key <= expected);
+ }
+ }
+
+ private void doPutOpTransaction(String regionName, int totalBuckets) throws InterruptedException {
+ TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+ Region<Integer, String> region = getCache().getRegion(regionName);
+ txMgr.begin();
+ region.put(2, "NEWVALUE");
+ Thread.currentThread().sleep(100);
+ txMgr.commit();
+ }
+
+ private void doSizeOpTransactions(String regionName, int totalBuckets, String region2Name) {
+ for (int i = 1; i <= totalBuckets; i++) {
+ doSizeOpTransaction(i, regionName, totalBuckets, region2Name);
+ }
+ }
+
+ private void doSizeOpTransaction(int key, String regionName, int totalBuckets,
+ String region2Name) {
+ Region<Integer, String> region = getCache().getRegion(regionName);
+ Region<Integer, String> region2 = getCache().getRegion(region2Name);
+ TXManagerImpl txMgr = (TXManagerImpl) getCache().getCacheTransactionManager();
+ txMgr.begin();
+ region.get(key); // starts TXState on different servers
+ assertEquals(totalBuckets, region.size());
+ int num = totalBuckets + 1;
+ region2.put(num, "" + num);
+ assertEquals(num, region2.size());
+ txMgr.rollback();
+ }
+
+ @Test
+ public void testSizeOpInTransaction() {
+ Host host = Host.getHost(0);
+ VM client = host.getVM(2);
+ String regionName = "region";
+ String region2Name = "region2";
+ int totalBuckets = 2;
+ setupRegionForClientTransactions(totalBuckets, regionName, true, region2Name);
+
+ client.invoke(() -> doSizeOpTransactions(regionName, totalBuckets, region2Name));
+ }
+
+ private void setupRegionForClientTransactions(int totalBuckets, String regionName,
+ boolean withReplicateRegion, String region2Name) {
+ Host host = Host.getHost(0);
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+ VM client = host.getVM(2);
+ int port = createRegionsAndStartServer(server1, true);
+
+ createPRAndInitABucketOnServer1(totalBuckets, regionName, server1);
+
+ createPRAndInitOtherBucketsOnServer2(totalBuckets, regionName, server2);
+
+ if (withReplicateRegion) {
+ initReplicateRegion(totalBuckets, region2Name, server1, server2);
+ }
+
+ createRegionOnClient(regionName, withReplicateRegion, region2Name, client, port);
+ }
+
+ private void createRegionOnClient(String regionName, boolean withReplicateRegion,
+ String region2Name, VM client, int port) {
+ client.invoke(() -> {
+ createClient(port, regionName);
+ if (withReplicateRegion) {
+ createClient(port, region2Name);
+ }
+ });
+ }
+
+ private void initReplicateRegion(int totalBuckets, String region2Name, VM server1, VM server2) {
+ server1.invoke(() -> createReplicateRegion(region2Name));
+ server2.invoke(() -> {
+ createReplicateRegion(region2Name);
+ Region<Integer, String> region = getCache().getRegion(region2Name);
+ for (int i = totalBuckets; i > 0; i--) {
+ region.put(i, "" + i);
+ }
+ });
+ }
+
+ private void createPRAndInitOtherBucketsOnServer2(int totalBuckets, String regionName,
+ VM server2) {
+ createRegionOnServer(server2);
+ server2.invoke(() -> {
+ createSubscriptionRegion(false, regionName, 0, totalBuckets);
+ Region<Integer, String> region = getCache().getRegion(regionName);
+ for (int i = totalBuckets; i > 1; i--) {
+ region.put(i, "VALUE-" + i);
+ }
+ });
+ }
+
+ private void createPRAndInitABucketOnServer1(int totalBuckets, String regionName, VM server1) {
+ server1.invoke(() -> {
+ createSubscriptionRegion(false, regionName, 0, totalBuckets);
+ Region<Integer, String> region = getCache().getRegion(regionName);
+ // should create first bucket on server1
+ region.put(1, "VALUE-1");
+ });
+ }
+
+ @Test
+ public void testKeySetOpInTransaction() {
+ Host host = Host.getHost(0);
+ VM client = host.getVM(2);
+ String regionName = "region";
+ String region2Name = "region2";
+ int totalBuckets = 2;
+ setupRegionForClientTransactions(totalBuckets, regionName, true, region2Name);
+
+ client.invoke(() -> doKeySetOpTransactions(regionName, totalBuckets, true, region2Name));
+ }
+
+ private void doKeySetOpTransactions(String regionName, int totalBuckets,
+ boolean withReplicateRegion, String region2Name) {
+ for (int i = 1; i <= totalBuckets; i++) {
+ doKeySetOpTransaction(i, regionName, totalBuckets, withReplicateRegion, region2Name);
+ }
+ }
+
+ private void createReplicateRegion(String regionName) {
+ RegionFactory rf = getCache().createRegionFactory(RegionShortcut.REPLICATE);
+ Region<Integer, String> region = rf.create(regionName);
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java
index 80888ef..a7240bb 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tx/AbstractPeerTXRegionStubTest.java
@@ -51,7 +51,7 @@ public class AbstractPeerTXRegionStubTest {
private class TestingAbstractPeerTXRegionStub extends AbstractPeerTXRegionStub {
private TestingAbstractPeerTXRegionStub(TXStateStub txState, LocalRegion r) {
- super(txState, r);
+ super(txState);
}
@Override
--
To stop receiving notification emails like this one, please contact
"commits@geode.apache.org" <co...@geode.apache.org>.