You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2018/06/13 17:20:12 UTC
[geode] branch develop updated: GEODE-5305: Add flag to
TXCommitMessage to indicate the use of shadow key (#2043)
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 80aabee GEODE-5305: Add flag to TXCommitMessage to indicate the use of shadow key (#2043)
80aabee is described below
commit 80aabeede64c81f92437bc1c8be11a4000ed43e2
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Wed Jun 13 10:20:07 2018 -0700
GEODE-5305: Add flag to TXCommitMessage to indicate the use of shadow key (#2043)
* Add new unit test for TxCommentMessage serialization.
Co-authored-by: Darrel Schneider <ds...@pivotal.io>
---
.../geode/internal/InternalDataSerializer.java | 125 ++-
.../geode/internal/cache/TXCommitMessage.java | 230 ++--
.../apache/geode/internal/cache/TXRegionState.java | 4 -
.../geode/internal/cache/TxCommitMessageTest.java | 1167 ++++++++++++++++++++
.../codeAnalysis/sanctionedDataSerializables.txt | 8 +-
5 files changed, 1363 insertions(+), 171 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 7ab6284..94aea2c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -141,23 +141,22 @@ public abstract class InternalDataSerializer extends DataSerializer {
/**
- * This list contains classes that Geode's classes subclass, such as antlr AST classes which
- * are used by our Object Query Language. It also contains certain
- * classes that are DataSerializable but end up being serialized as part of other serializable
- * objects. VersionedObjectList, for instance, is serialized as part of a
- * partial putAll exception object.
+ * This list contains classes that Geode's classes subclass, such as antlr AST classes which are
+ * used by our Object Query Language. It also contains certain classes that are DataSerializable
+ * but end up being serialized as part of other serializable objects. VersionedObjectList, for
+ * instance, is serialized as part of a partial putAll exception object.
* <p>
- * Do not java-serialize objects that Geode does not have complete control over. This
- * leaves us open to security attacks such as Gadget Chains and compromises the ability
- * to do a rolling upgrade from one version of Geode to the next.
+ * Do not java-serialize objects that Geode does not have complete control over. This leaves us
+ * open to security attacks such as Gadget Chains and compromises the ability to do a rolling
+ * upgrade from one version of Geode to the next.
* <p>
* In general you shouldn't use java serialization and you should implement
- * DataSerializableFixedID
- * for internal Geode objects. This gives you better control over backward-compatibility.
+ * DataSerializableFixedID for internal Geode objects. This gives you better control over
+ * backward-compatibility.
* <p>
- * Do not add to this list unless absolutely necessary. Instead put your classes either
- * in the sanctionedSerializables file for your module or in its excludedClasses file.
- * Run AnalyzeSerializables to generate the content for the file.
+ * Do not add to this list unless absolutely necessary. Instead put your classes either in the
+ * sanctionedSerializables file for your module or in its excludedClasses file. Run
+ * AnalyzeSerializables to generate the content for the file.
* <p>
*/
private static final String SANCTIONED_SERIALIZABLES_DEPENDENCIES_PATTERN =
@@ -798,8 +797,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
new ConcurrentHashMap<>();
/**
- * {@code RegistrationListener}s that receive callbacks when {@code DataSerializer}s and
- * {@code Instantiator}s are registered. Note: copy-on-write access used for this set
+ * {@code RegistrationListener}s that receive callbacks when {@code DataSerializer}s and {@code
+ * Instantiator}s are registered. Note: copy-on-write access used for this set
*/
private static volatile Set listeners = new HashSet();
@@ -825,7 +824,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
* Instantiates an instance of {@code DataSerializer}
*
* @throws IllegalArgumentException If the class can't be instantiated
- *
* @see DataSerializer#register(Class)
*/
static DataSerializer newInstance(Class c) {
@@ -843,7 +841,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
StringId s = LocalizedStrings.DataSerializer_CLASS_0_DOES_NOT_HAVE_A_ZEROARGUMENT_CONSTRUCTOR;
Object[] args = new Object[] {c.getName()};
if (c.getDeclaringClass() != null) {
- s = LocalizedStrings.DataSerializer_CLASS_0_DOES_NOT_HAVE_A_ZEROARGUMENT_CONSTRUCTOR_IT_IS_AN_INNER_CLASS_OF_1_SHOULD_IT_BE_A_STATIC_INNER_CLASS;
+ s =
+ LocalizedStrings.DataSerializer_CLASS_0_DOES_NOT_HAVE_A_ZEROARGUMENT_CONSTRUCTOR_IT_IS_AN_INNER_CLASS_OF_1_SHOULD_IT_BE_A_STATIC_INNER_CLASS;
args = new Object[] {c.getName(), c.getDeclaringClass()};
}
throw new IllegalArgumentException(s.toLocalizedString(args));
@@ -889,7 +888,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
*
* @param distribute Should the registered {@code DataSerializer} be distributed to other members
* of the distributed system?
- *
* @see DataSerializer#register(Class)
*/
public static DataSerializer register(Class c, boolean distribute) {
@@ -1018,8 +1016,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
/**
- * Marks a {@code DataSerializer} className for registration with the data serialization
- * framework if and when it is needed. Does not necessarily load the classes into this VM.
+ * Marks a {@code DataSerializer} className for registration with the data serialization framework
+ * if and when it is needed. Does not necessarily load the classes into this VM.
*
* @param className Name of the DataSerializer class.
* @param distribute If true, distribute this data serializer.
@@ -1074,10 +1072,10 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
/**
- * During client/server handshakes the server may send a collection of DataSerializers and
- * the classes they support. The DataSerializers are registered as "holders" to avoid loading the
- * actual classes until they're needed. This method registers the names of classes supported
- * by the DataSerializers
+ * During client/server handshakes the server may send a collection of DataSerializers and the
+ * classes they support. The DataSerializers are registered as "holders" to avoid loading the
+ * actual classes until they're needed. This method registers the names of classes supported by
+ * the DataSerializers
*
* @param map The classes returned by DataSerializer.supportedClasses()
*/
@@ -1100,10 +1098,10 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
/**
- * A SerializerAttributesHolder holds information required to load a DataSerializer
- * and exists to allow client/server connections to be created more quickly than
- * they would if the DataSerializer information downloaded from the server were
- * used to immediately load the corresponding classes.
+ * A SerializerAttributesHolder holds information required to load a DataSerializer and exists to
+ * allow client/server connections to be created more quickly than they would if the
+ * DataSerializer information downloaded from the server were used to immediately load the
+ * corresponding classes.
*/
public static class SerializerAttributesHolder {
private String className = "";
@@ -1121,7 +1119,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
/**
- *
* @return String the classname of the data serializer this instance represents.
*/
public String getClassName() {
@@ -1230,10 +1227,9 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
/**
- * Returns the {@code DataSerializer} for the given class. If no class has been registered,
- * {@code null} is returned. Remember that it is okay to return {@code null} in this case. This
- * method is invoked when writing an object. If a serializer isn't available, then its the user's
- * fault.
+ * Returns the {@code DataSerializer} for the given class. If no class has been registered, {@code
+ * null} is returned. Remember that it is okay to return {@code null} in this case. This method is
+ * invoked when writing an object. If a serializer isn't available, then its the user's fault.
*/
private static DataSerializer getSerializer(Class c) {
DataSerializer ds = classesToSerializers.get(c.getName());
@@ -1813,14 +1809,11 @@ public abstract class InternalDataSerializer extends DataSerializer {
/**
* Writes a {@code Set} to a {@code DataOutput}.
* <P>
- * This method is internal because its semantics (that is, its ability to write any kind of
- * {@code Set}) are different from the {@code write}XXX methods of the external
- * {@code DataSerializer}.
+ * This method is internal because its semantics (that is, its ability to write any kind of {@code
+ * Set}) are different from the {@code write}XXX methods of the external {@code DataSerializer}.
*
* @throws IOException A problem occurs while writing to {@code out}
- *
* @see #readSet
- *
* @since GemFire 4.0
*/
public static void writeSet(Collection<?> set, DataOutput out) throws IOException {
@@ -1849,9 +1842,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
* @throws IOException A problem occurs while writing to {@code out}
* @throws ClassNotFoundException The class of one of the {@code HashSet}'s elements cannot be
* found.
- *
* @see #writeSet
- *
* @since GemFire 4.0
*/
public static Set readSet(DataInput in) throws IOException, ClassNotFoundException {
@@ -1864,7 +1855,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
*
* @throws IOException A problem occurs while reading from {@code in}
* @throws ClassNotFoundException The class of one of the {@code Set}'s elements cannot be found.
- *
* @see #writeSet
*/
public static <E> boolean readCollection(DataInput in, Collection<E> c)
@@ -1912,7 +1902,9 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
}
- /** read a set of Long objects */
+ /**
+ * read a set of Long objects
+ */
public static Set<Long> readSetOfLongs(DataInput in) throws IOException {
int size = in.readInt();
if (size < 0) {
@@ -1953,7 +1945,9 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
}
- /** read a set of Long objects */
+ /**
+ * read a set of Long objects
+ */
public static List<Long> readListOfLongs(DataInput in) throws IOException {
int size = in.readInt();
if (size < 0) {
@@ -1970,7 +1964,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
-
/**
* Writes the type code for a primitive type Class to {@code DataOutput}.
*/
@@ -2175,7 +2168,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
* Writes given number of characters from array of {@code char}s to a {@code DataOutput}.
*
* @throws IOException A problem occurs while writing to {@code out}
- *
* @see DataSerializer#readCharArray
* @since GemFire 6.6
*/
@@ -2484,7 +2476,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
- throw new ToDataException("toData failed on DataSerializable " + ds.getClass(), t);
+ throw new ToDataException(
+ "toData failed on DataSerializable " + null == ds ? "null" : ds.getClass().toString(), t);
}
}
@@ -2702,7 +2695,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
/**
- * Serializes a list of Integers. The argument may be null. Deserialize with readListOfIntegers().
+ * Serializes a list of Integers. The argument may be null. Deserialize with
+ * readListOfIntegers().
*
* TODO: writeListOfIntegers is unused
*/
@@ -2762,7 +2756,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
* being read. The return value may be {@code null}.
*
* @throws IOException A problem occurs while reading from {@code in}
- *
* @since GemFire 5.7
*/
public static String readString(DataInput in, byte header) throws IOException {
@@ -3341,10 +3334,14 @@ public abstract class InternalDataSerializer extends DataSerializer {
* wait an amount of time for the registration message to arrive.
*/
abstract static class Marker {
- /** The DataSerializer that is filled in upon registration */
+ /**
+ * The DataSerializer that is filled in upon registration
+ */
protected DataSerializer serializer = null;
- /** set to true once setSerializer is called. */
+ /**
+ * set to true once setSerializer is called.
+ */
boolean hasBeenSet = false;
abstract DataSerializer getSerializer();
@@ -3361,6 +3358,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
}
}
+
/**
* A marker object for {@code DataSerializer}s that have not been registered. Using this marker
* object allows us to asynchronously send {@code DataSerializer} registration updates. If the
@@ -3426,6 +3424,7 @@ public abstract class InternalDataSerializer extends DataSerializer {
* yet, then this method will wait until the serializer is registered. If this method has to
* wait for too long, then {@code null} is returned.
*/
+
/**
* Returns the serializer associated with this marker. Waits forever (unless interrupted) for it
* to be initialized. Returns null if this Marker failed to initialize.
@@ -3446,9 +3445,10 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
}
}
+
/**
- * A distribution message that alerts other members of the distributed cache of a new
- * {@code DataSerializer} being registered.
+ * A distribution message that alerts other members of the distributed cache of a new {@code
+ * DataSerializer} being registered.
*/
public static class RegistrationMessage extends SerialDistributionMessage {
/**
@@ -3461,10 +3461,14 @@ public abstract class InternalDataSerializer extends DataSerializer {
*/
protected EventID eventId;
- /** The name of the {@code DataSerializer} class */
+ /**
+ * The name of the {@code DataSerializer} class
+ */
private String className;
- /** The versions in which this message was modified */
+ /**
+ * The versions in which this message was modified
+ */
private static final Version[] dsfidVersions = new Version[] {};
/**
@@ -3473,8 +3477,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
public RegistrationMessage() {}
/**
- * Creates a new {@code RegistrationMessage} that broadcasts that the given
- * {@code DataSerializer} was registered.
+ * Creates a new {@code RegistrationMessage} that broadcasts that the given {@code
+ * DataSerializer} was registered.
*/
public RegistrationMessage(DataSerializer s) {
this.className = s.getClass().getName();
@@ -3588,8 +3592,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
}
/**
- * A listener whose listener methods are invoked when {@link DataSerializer}s and
- * {@link Instantiator}s are registered. This is part of the fix for bug 31422.
+ * A listener whose listener methods are invoked when {@link DataSerializer}s and {@link
+ * Instantiator}s are registered. This is part of the fix for bug 31422.
*
* @see InternalDataSerializer#addRegistrationListener
* @see InternalDataSerializer#removeRegistrationListener
@@ -3616,8 +3620,8 @@ public abstract class InternalDataSerializer extends DataSerializer {
private static class DSObjectInputStream extends ObjectInputStream {
/**
- * Creates a new {@code DSObjectInputStream} that delegates its behavior to a given
- * {@code InputStream}.
+ * Creates a new {@code DSObjectInputStream} that delegates its behavior to a given {@code
+ * InputStream}.
*/
DSObjectInputStream(InputStream stream) throws IOException {
super(stream);
@@ -3708,7 +3712,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
/**
* Just like a WellKnownDS but its type is compatible with PDX.
- *
*/
protected abstract static class WellKnownPdxDS extends WellKnownDS {
// subclasses need to implement toData
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 8354430..722130e 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -115,10 +115,14 @@ public class TXCommitMessage extends PooledDistributionMessage
private transient long farsideBaseThreadId; // only available on farside
private transient long farsideBaseSequenceId; // only available on farside
- /** (Nearside) true of any regions in this TX have required roles */
+ /**
+ * (Nearside) true of any regions in this TX have required roles
+ */
private transient boolean hasReliableRegions = false;
- /** Set of all caching exceptions produced hile processing this tx */
+ /**
+ * Set of all caching exceptions produced hile processing this tx
+ */
private transient Set processingExceptions = Collections.emptySet();
private transient ClientProxyMembershipID bridgeContext = null;
@@ -136,7 +140,8 @@ public class TXCommitMessage extends PooledDistributionMessage
public static final TXCommitMessage CMT_CONFLICT_MSG = new TXCommitMessage();
/**
* A token to be put in TXManagerImpl#failoverMap to represent a
- * TransactionDataNodeHasDepartedException while committing a transaction
+ * TransactionDataNodeHasDepartedException
+ * while committing a transaction
*/
public static final TXCommitMessage REBALANCE_MSG = new TXCommitMessage();
/**
@@ -200,7 +205,7 @@ public class TXCommitMessage extends PooledDistributionMessage
// make sure we have some changes and someone to send them to
if (!this.currentRegion.isEmpty() && s != null && !s.isEmpty()) {
// Get the persistent ids for the current region and save them
- this.currentRegion.persistentIds = getPersistentIds(this.currentRegion.r);
+ this.currentRegion.persistentIds = getPersistentIds(this.currentRegion.internalRegion);
if (this.msgMap == null) {
this.msgMap = new HashMap<>();
@@ -279,7 +284,9 @@ public class TXCommitMessage extends PooledDistributionMessage
private transient boolean disableListeners = false;
- /** record CacheDistributionAdvisor.startOperation versions for later cleanup */
+ /**
+ * record CacheDistributionAdvisor.startOperation versions for later cleanup
+ */
protected void addViewVersion(DistributedRegion dr, long version) {
viewVersions.put(dr, version);
}
@@ -323,7 +330,7 @@ public class TXCommitMessage extends PooledDistributionMessage
updateLockMembers();
IdentityHashMap distMap = new IdentityHashMap(); // Map of RegionCommitList keys to Sets of
- // receivers
+ // receivers
HashSet ackReceivers = null;
{
Iterator it = this.msgMap.entrySet().iterator();
@@ -450,7 +457,7 @@ public class TXCommitMessage extends PooledDistributionMessage
for (Iterator rclIter = rcl.iterator(); rclIter.hasNext();) {
RegionCommit rc = (RegionCommit) rclIter.next();
// skip region if no required roles
- if (!rc.r.requiresReliabilityCheck()) {
+ if (!rc.internalRegion.requiresReliabilityCheck()) {
continue;
}
@@ -484,20 +491,20 @@ public class TXCommitMessage extends PooledDistributionMessage
// remove members who destroyed that region or closed their cache
Set regionDestroyedMembers = (processor == null) ? Collections.emptySet()
- : processor.getRegionDestroyedMembers(rc.r.getFullPath());
+ : processor.getRegionDestroyedMembers(rc.internalRegion.getFullPath());
successfulRecipients.removeAll(cacheClosedMembers);
successfulRecipients.removeAll(regionDestroyedMembers);
try {
- rc.r.handleReliableDistribution(successfulRecipients);
+ rc.internalRegion.handleReliableDistribution(successfulRecipients);
} catch (RegionDistributionException e) {
if (regionDistributionExceptions == Collections.emptySet()) {
regionDistributionExceptions = new HashSet();
failedRegionNames = new HashSet();
}
regionDistributionExceptions.add(e);
- failedRegionNames.add(rc.r.getFullPath());
+ failedRegionNames.add(rc.internalRegion.getFullPath());
}
}
@@ -576,14 +583,6 @@ public class TXCommitMessage extends PooledDistributionMessage
this.dm = dm;
}
- public DistributionManager getDM() {
- if (this.dm == null) {
- InternalCache cache = GemFireCacheImpl.getExisting("Applying TXCommit");
- this.dm = cache.getDistributionManager();
- }
- return this.dm;
- }
-
public void basicProcess() {
final DistributionManager dm = this.dm;
@@ -630,7 +629,7 @@ public class TXCommitMessage extends PooledDistributionMessage
processCacheRuntimeException(problem);
} finally {
if (failedBeginProcess) {
- rc.r = null; // Cause related FarSideEntryOps to skip processing
+ rc.internalRegion = null; // Cause related FarSideEntryOps to skip processing
it.remove(); // Skip endProcessing as well
}
}
@@ -813,16 +812,17 @@ public class TXCommitMessage extends PooledDistributionMessage
this.farsideBaseThreadId = in.readLong();
this.farsideBaseSequenceId = in.readLong();
-
this.needsLargeModCount = in.readBoolean();
+ final boolean hasShadowKeys = hasFlagsField(in) ? in.readBoolean() : useShadowKey();
+
int regionsSize = in.readInt();
this.regions = new ArrayList(regionsSize);
this.farSideEntryOps = new ArrayList(totalMaxSize);
for (int i = 0; i < regionsSize; i++) {
RegionCommit rc = new RegionCommit(this);
try {
- rc.fromData(in);
+ rc.fromData(in, hasShadowKeys);
} catch (CacheClosedException cce) {
addProcessingException(cce);
// return to avoid serialization error being sent in reply
@@ -898,13 +898,17 @@ public class TXCommitMessage extends PooledDistributionMessage
DataSerializer.writeBoolean(this.needsLargeModCount, out);
}
+ final boolean useShadowKey = useShadowKey();
+ if (hasFlagsField(out)) {
+ out.writeBoolean(useShadowKey);
+ }
out.writeInt(regionsSize);
{
if (regionsSize > 0) {
for (int i = 0; i < this.regions.size(); i++) {
RegionCommit rc = (RegionCommit) this.regions.get(i);
- rc.toData(out);
+ rc.toData(out, useShadowKey);
}
}
}
@@ -914,6 +918,21 @@ public class TXCommitMessage extends PooledDistributionMessage
DataSerializer.writeHashSet(this.farSiders, out);
}
+ private boolean hasFlagsField(final DataOutput out) {
+ return hasFlagsField(InternalDataSerializer.getVersionForDataStream(out));
+ }
+
+ private boolean hasFlagsField(final DataInput in) {
+ return hasFlagsField(InternalDataSerializer.getVersionForDataStream(in));
+ }
+
+ private boolean hasFlagsField(final Version version) {
+ return version.compareTo(Version.GEODE_180) >= 0;
+ }
+
+ private boolean useShadowKey() {
+ return null == clientVersion;
+ }
@Override
public String toString() {
@@ -946,7 +965,8 @@ public class TXCommitMessage extends PooledDistributionMessage
/**
* Combines a set of small TXCommitMessages that belong to one transaction into a txCommitMessage
* that represents an entire transaction. At commit time the txCommitMessage sent to each node can
- * be a subset of the transaction, this method will combine those subsets into a complete message.
+ * be a subset of the transaction, this method will combine those subsets into a complete
+ * message.
*
* @return the complete txCommitMessage
*/
@@ -1052,7 +1072,7 @@ public class TXCommitMessage extends PooledDistributionMessage
/**
* The region that this commit represents. Valid on both nearside and farside.
*/
- protected transient InternalRegion r;
+ protected transient InternalRegion internalRegion;
/**
* Valid only on farside.
*/
@@ -1100,7 +1120,7 @@ public class TXCommitMessage extends PooledDistributionMessage
*/
RegionCommit(TXCommitMessage msg, InternalRegion r, int maxSize) {
this.msg = msg;
- this.r = r;
+ this.internalRegion = r;
this.maxSize = maxSize;
}
@@ -1144,20 +1164,21 @@ public class TXCommitMessage extends PooledDistributionMessage
if (!hookupRegion(dm)) {
return false;
}
- if (msg.isAckRequired() && (this.r == null || !this.r.getScope().isDistributed())) {
+ if (msg.isAckRequired()
+ && (this.internalRegion == null || !this.internalRegion.getScope().isDistributed())) {
if (logger.isDebugEnabled()) {
logger.debug("Received unneeded commit data for region {}", this.regionPath);
}
this.msg.addProcessingException(new RegionDestroyedException(
LocalizedStrings.TXCommitMessage_REGION_NOT_FOUND.toLocalizedString(),
this.regionPath));
- this.r = null;
+ this.internalRegion = null;
return false;
}
- this.needsUnlock = this.r.lockGII();
- this.r.txLRUStart();
+ this.needsUnlock = this.internalRegion.lockGII();
+ this.internalRegion.txLRUStart();
this.needsLRUEnd = true;
- if (this.r.isInitialized()) {
+ if (this.internalRegion.isInitialized()) {
// We don't want the txEvent to know anything about our regions
// that are still doing gii.
this.txEvent = txEvent;
@@ -1170,18 +1191,18 @@ public class TXCommitMessage extends PooledDistributionMessage
"Received unneeded commit data for region {} because the region was destroyed.",
this.regionPath, e);
}
- this.r = null;
+ this.internalRegion = null;
}
- return this.r != null;
+ return this.internalRegion != null;
}
private boolean hookupRegion(DistributionManager dm) {
- this.r = getRegionByPath(dm, regionPath);
- if (this.r == null && this.parentRegionPath != null) {
- this.r = getRegionByPath(dm, this.parentRegionPath);
+ this.internalRegion = getRegionByPath(dm, regionPath);
+ if (this.internalRegion == null && this.parentRegionPath != null) {
+ this.internalRegion = getRegionByPath(dm, this.parentRegionPath);
this.regionPath = this.parentRegionPath;
}
- if (this.r == null && dm.getSystem().isLoner()) {
+ if (this.internalRegion == null && dm.getSystem().isLoner()) {
// If there are additional regions that the server enlisted in the tx,
// which the client does not have, the client can just ignore the region
// see bug 51922
@@ -1199,16 +1220,16 @@ public class TXCommitMessage extends PooledDistributionMessage
* Called when processing is complete; only needs to be called if beginProcess returned true.
*/
void endProcess() {
- if (this.r != null) {
+ if (this.internalRegion != null) {
try {
if (this.needsLRUEnd) {
this.needsLRUEnd = false;
- this.r.txLRUEnd();
+ this.internalRegion.txLRUEnd();
}
} finally {
if (this.needsUnlock) {
this.needsUnlock = false;
- this.r.unlockGII();
+ this.internalRegion.unlockGII();
}
}
}
@@ -1229,25 +1250,27 @@ public class TXCommitMessage extends PooledDistributionMessage
*/
@SuppressWarnings("synthetic-access")
protected void txApplyEntryOp(FarSideEntryOp entryOp, List<EntryEventImpl> pendingCallbacks) {
- if (this.r == null) {
+ if (this.internalRegion == null) {
return;
}
EventID eventID = getEventId(entryOp);
- boolean isDuplicate = this.r.hasSeenEvent(eventID);
- boolean callbacksOnly = (this.r.getDataPolicy() == DataPolicy.PARTITION) || isDuplicate;
- if (this.r instanceof PartitionedRegion) {
+ boolean isDuplicate = this.internalRegion.hasSeenEvent(eventID);
+ boolean callbacksOnly =
+ (this.internalRegion.getDataPolicy() == DataPolicy.PARTITION) || isDuplicate;
+ if (this.internalRegion instanceof PartitionedRegion) {
/*
* This happens when we don't have the bucket and are getting adjunct notification
*/
// No need to release because it is added to pendingCallbacks and they will be released
// later
- EntryEventImpl eei = AbstractRegionMap.createCallbackEvent(this.r, entryOp.op, entryOp.key,
- entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,
- entryOp.filterRoutingInfo, this.msg.bridgeContext, null, entryOp.versionTag,
- entryOp.tailKey);
+ EntryEventImpl eei =
+ AbstractRegionMap.createCallbackEvent(this.internalRegion, entryOp.op, entryOp.key,
+ entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,
+ entryOp.filterRoutingInfo, this.msg.bridgeContext, null, entryOp.versionTag,
+ entryOp.tailKey);
if (entryOp.filterRoutingInfo != null) {
eei.setLocalFilterInfo(
- entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId()));
+ entryOp.filterRoutingInfo.getFilterInfo(this.internalRegion.getCache().getMyId()));
}
if (isDuplicate) {
eei.setPossibleDuplicate(true);
@@ -1274,17 +1297,19 @@ public class TXCommitMessage extends PooledDistributionMessage
entryOp.versionTag.replaceNullIDs(this.msg.getSender());
}
if (entryOp.op.isDestroy()) {
- this.r.txApplyDestroy(entryOp.key, this.msg.txIdent, this.txEvent, this.needsUnlock,
+ this.internalRegion.txApplyDestroy(entryOp.key, this.msg.txIdent, this.txEvent,
+ this.needsUnlock,
entryOp.op, getEventId(entryOp), entryOp.callbackArg, pendingCallbacks,
entryOp.filterRoutingInfo, this.msg.bridgeContext, false /* origin remote */,
null/* txEntryState */, entryOp.versionTag, entryOp.tailKey);
} else if (entryOp.op.isInvalidate()) {
- this.r.txApplyInvalidate(entryOp.key, Token.INVALID, entryOp.didDestroy, this.msg.txIdent,
+ this.internalRegion.txApplyInvalidate(entryOp.key, Token.INVALID, entryOp.didDestroy,
+ this.msg.txIdent,
this.txEvent, false /* localOp */, getEventId(entryOp), entryOp.callbackArg,
pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext,
null/* txEntryState */, entryOp.versionTag, entryOp.tailKey);
} else {
- this.r.txApplyPut(entryOp.op, entryOp.key, entryOp.value, entryOp.didDestroy,
+ this.internalRegion.txApplyPut(entryOp.op, entryOp.key, entryOp.value, entryOp.didDestroy,
this.msg.txIdent, this.txEvent, getEventId(entryOp), entryOp.callbackArg,
pendingCallbacks, entryOp.filterRoutingInfo, this.msg.bridgeContext,
null/* txEntryState */, entryOp.versionTag, entryOp.tailKey);
@@ -1296,18 +1321,19 @@ public class TXCommitMessage extends PooledDistributionMessage
*/
@SuppressWarnings("synthetic-access")
protected void txApplyEntryOpAdjunctOnly(FarSideEntryOp entryOp) {
- if (this.r == null) {
+ if (this.internalRegion == null) {
return;
}
EventID eventID = getEventId(entryOp);
- boolean isDuplicate = this.r.hasSeenEvent(eventID);
- boolean callbacksOnly = (this.r.getDataPolicy() == DataPolicy.PARTITION) || isDuplicate;
- if (this.r instanceof PartitionedRegion) {
+ boolean isDuplicate = this.internalRegion.hasSeenEvent(eventID);
+ boolean callbacksOnly =
+ (this.internalRegion.getDataPolicy() == DataPolicy.PARTITION) || isDuplicate;
+ if (this.internalRegion instanceof PartitionedRegion) {
- PartitionedRegion pr = (PartitionedRegion) r;
+ PartitionedRegion pr = (PartitionedRegion) internalRegion;
BucketRegion br = pr.getBucketRegion(entryOp.key);
Set bucketOwners = br.getBucketOwners();
- InternalDistributedMember thisMember = this.r.getDistributionManager().getId();
+ InternalDistributedMember thisMember = this.internalRegion.getDistributionManager().getId();
if (bucketOwners.contains(thisMember)) {
return;
}
@@ -1316,14 +1342,15 @@ public class TXCommitMessage extends PooledDistributionMessage
* This happens when we don't have the bucket and are getting adjunct notification
*/
@Released
- EntryEventImpl eei = AbstractRegionMap.createCallbackEvent(this.r, entryOp.op, entryOp.key,
- entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,
- entryOp.filterRoutingInfo, this.msg.bridgeContext, null, entryOp.versionTag,
- entryOp.tailKey);
+ EntryEventImpl eei =
+ AbstractRegionMap.createCallbackEvent(this.internalRegion, entryOp.op, entryOp.key,
+ entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,
+ entryOp.filterRoutingInfo, this.msg.bridgeContext, null, entryOp.versionTag,
+ entryOp.tailKey);
try {
if (entryOp.filterRoutingInfo != null) {
eei.setLocalFilterInfo(
- entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId()));
+ entryOp.filterRoutingInfo.getFilterInfo(this.internalRegion.getCache().getMyId()));
}
if (isDuplicate) {
eei.setPossibleDuplicate(true);
@@ -1337,7 +1364,7 @@ public class TXCommitMessage extends PooledDistributionMessage
// the message was sent and already reflects the change caused by this event.
// In the latter case we need to invoke listeners
final boolean skipListeners = !isDuplicate;
- eei.invokeCallbacks(this.r, skipListeners, true);
+ eei.invokeCallbacks(this.internalRegion, skipListeners, true);
} finally {
eei.release();
}
@@ -1350,7 +1377,7 @@ public class TXCommitMessage extends PooledDistributionMessage
}
boolean needsAck() {
- return this.r.getScope().isDistributedAck();
+ return this.internalRegion.getScope().isDistributedAck();
}
void addOp(Object key, TXEntryState entry) {
@@ -1371,7 +1398,8 @@ public class TXCommitMessage extends PooledDistributionMessage
return true;
}
- public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ public void fromData(DataInput in, boolean hasShadowKey)
+ throws IOException, ClassNotFoundException {
this.regionPath = DataSerializer.readString(in);
this.parentRegionPath = DataSerializer.readString(in);
@@ -1384,7 +1412,7 @@ public class TXCommitMessage extends PooledDistributionMessage
for (int i = 0; i < size; i++) {
FarSideEntryOp entryOp = new FarSideEntryOp();
// shadowkey is not being sent to clients
- entryOp.fromData(in, largeModCount, hasShadowKey(regionPath, parentRegionPath));
+ entryOp.fromData(in, largeModCount, hasShadowKey);
if (entryOp.versionTag != null && this.memberId != null) {
entryOp.versionTag.setMemberID(this.memberId);
}
@@ -1395,27 +1423,13 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
- private boolean hasShadowKey(String regionPath, String parentRegionPath) {
- // in bucket region, regionPath is bucket name, use parentRegionPath
- String path = parentRegionPath != null ? parentRegionPath : regionPath;
- LocalRegion region = getRegionByPath(msg.getDM(), path);
-
- // default value is whether loner or not, region is null if destroyRegion executed
- boolean readShadowKey = !msg.getDM().isLoner();
- if (region != null) {
- // shadowkey is not being sent to clients
- readShadowKey = region.getPoolName() == null;
- }
- return readShadowKey;
- }
-
@Override
public String toString() {
StringBuilder result = new StringBuilder(64);
if (this.regionPath != null) {
result.append(this.regionPath);
} else {
- result.append(this.r.getFullPath());
+ result.append(this.internalRegion.getFullPath());
}
if (this.refCount > 0) {
result.append(" refCount=").append(this.refCount);
@@ -1423,11 +1437,12 @@ public class TXCommitMessage extends PooledDistributionMessage
return result.toString();
}
- private void basicToData(DataOutput out) throws IOException {
- if (this.r != null) {
- DataSerializer.writeString(this.r.getFullPath(), out);
- if (this.r instanceof BucketRegion) {
- DataSerializer.writeString(((Bucket) this.r).getPartitionedRegion().getFullPath(), out);
+ private void basicToData(DataOutput out, boolean useShadowKey) throws IOException {
+ if (this.internalRegion != null) {
+ DataSerializer.writeString(this.internalRegion.getFullPath(), out);
+ if (this.internalRegion instanceof BucketRegion) {
+ DataSerializer.writeString(
+ ((Bucket) this.internalRegion).getPartitionedRegion().getFullPath(), out);
} else {
DataSerializer.writeString(null, out);
}
@@ -1455,10 +1470,10 @@ public class TXCommitMessage extends PooledDistributionMessage
if (sendVersionTags) {
VersionSource member = this.memberId;
if (member == null) {
- if (this.r == null) {
+ if (this.internalRegion == null) {
Assert.assertTrue(this.msg.txState == null);
} else {
- member = this.r.getVersionMember();
+ member = this.internalRegion.getVersionMember();
}
}
DataSerializer.writeObject(member, out);
@@ -1468,28 +1483,28 @@ public class TXCommitMessage extends PooledDistributionMessage
if (this.msg.txState != null) {
/* we are still on tx node and have the entry state */
((TXEntryState) this.opEntries.get(i)).toFarSideData(out, largeModCount,
- sendVersionTags, this.msg.clientVersion == null);
+ sendVersionTags, useShadowKey);
} else {
((FarSideEntryOp) this.opEntries.get(i)).toData(out, largeModCount, sendVersionTags,
- this.msg.clientVersion == null);
+ useShadowKey);
}
}
}
}
- public void toData(DataOutput out) throws IOException {
+ public void toData(DataOutput out, boolean useShadowKey) throws IOException {
if (this.preserializedBuffer != null) {
this.preserializedBuffer.rewind();
this.preserializedBuffer.sendTo(out);
} else if (this.refCount > 1) {
Version v = InternalDataSerializer.getVersionForDataStream(out);
HeapDataOutputStream hdos = new HeapDataOutputStream(1024, v);
- basicToData(hdos);
+ basicToData(hdos, useShadowKey);
this.preserializedBuffer = hdos;
this.preserializedBuffer.sendTo(out);
} else {
- basicToData(out);
+ basicToData(out, useShadowKey);
}
}
@@ -1621,8 +1636,9 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public boolean equals(Object o) {
- if (o == null || !(o instanceof FarSideEntryOp))
+ if (o == null || !(o instanceof FarSideEntryOp)) {
return false;
+ }
return compareTo(o) == 0;
}
@@ -1941,9 +1957,12 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public void quorumLost(DistributionManager distributionManager,
- Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
+ Set<InternalDistributedMember> failures,
+ List<InternalDistributedMember> remaining) {}
- /** return true if the member initiating this transaction has left the cluster */
+ /**
+ * return true if the member initiating this transaction has left the cluster
+ */
public boolean isDepartureNoticed() {
return departureNoticed;
}
@@ -2172,11 +2191,17 @@ public class TXCommitMessage extends PooledDistributionMessage
*/
public static class CommitExceptionCollectingException extends ReplyException {
private static final long serialVersionUID = 589384721273797822L;
- /** Set of members that threw CacheClosedExceptions */
+ /**
+ * Set of members that threw CacheClosedExceptions
+ */
private final Set<InternalDistributedMember> cacheExceptions;
- /** key=region path, value=Set of members */
+ /**
+ * key=region path, value=Set of members
+ */
private final Map<String, Set<InternalDistributedMember>> regionExceptions;
- /** List of exceptions that were unexpected and caused the tx to fail */
+ /**
+ * List of exceptions that were unexpected and caused the tx to fail
+ */
private final Map fatalExceptions;
private final TXId id;
@@ -2236,7 +2261,7 @@ public class TXCommitMessage extends PooledDistributionMessage
RegionCommitList rcl = memberMap.getValue();
for (RegionCommit region : rcl) {
Set<InternalDistributedMember> failedMembers =
- regionExceptions.get(region.r.getFullPath());
+ regionExceptions.get(region.internalRegion.getFullPath());
if (failedMembers != null && failedMembers.contains(member)) {
markMemberOffline(member, region);
}
@@ -2275,11 +2300,12 @@ public class TXCommitMessage extends PooledDistributionMessage
// if we have started to shutdown, we don't want to mark the peer
// as offline, or we will think we have newer data when in fact we don't
- region.r.getCancelCriterion().checkCancelInProgress(null);
+ region.internalRegion.getCancelCriterion().checkCancelInProgress(null);
// Otherwise, mark the peer as offline, because it didn't complete
// the operation.
- ((DistributedRegion) region.r).getPersistenceAdvisor().markMemberOffline(member,
+ ((DistributedRegion) region.internalRegion).getPersistenceAdvisor().markMemberOffline(
+ member,
persistentId);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java
index fc5bf1f..a061b34 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java
@@ -73,10 +73,6 @@ public class TXRegionState {
LocalizedStrings.TXRegionState_OPERATIONS_ON_GLOBAL_REGIONS_ARE_NOT_ALLOWED_BECAUSE_THIS_THREAD_HAS_AN_ACTIVE_TRANSACTION
.toLocalizedString());
}
- if (r.hasServerProxy()) {
- // throw new
- // UnsupportedOperationException(LocalizedStrings.TXRegionState_OPERATIONS_ON_REGION_WITH_CLIENT_POOL_ARE_NOT_ALLOWED_BECAUSE_THIS_THREAD_HAS_AN_ACTIVE_TRANSACTION.toLocalizedString());
- }
this.entryMods = new HashMap<Object, TXEntryState>();
this.uaMods = null;
this.region = r;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/TxCommitMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/TxCommitMessageTest.java
new file mode 100755
index 0000000..489b895
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/TxCommitMessageTest.java
@@ -0,0 +1,1167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.Sequence;
+import org.jmock.lib.concurrent.Synchroniser;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.VersionedDataStream;
+import org.apache.geode.internal.cache.versions.VersionSource;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+/**
+ * Unit tests for TxCommitMessage.
+ */
+@Category(UnitTest.class)
+public class TxCommitMessageTest {
+
+ private Mockery mockContext;
+
+ @Before
+ public void setUp() {
+ mockContext = new Mockery() {
+ {
+ setImposteriser(ClassImposteriser.INSTANCE);
+ setThreadingPolicy(new Synchroniser());
+ }
+ };
+ }
+
+ @After
+ public void tearDown() {
+ mockContext.assertIsSatisfied();
+ mockContext = null;
+ }
+
+ public interface VersionedDataOutput extends DataOutput, VersionedDataStream {
+
+ }
+
+ public interface VersionedDataInput extends DataInput, VersionedDataStream {
+
+ }
+
+ @Test
+ public void toDataWithShadowKeyPre180Server() throws IOException {
+ final Sequence toData = mockContext.sequence("toData");
+ final VersionedDataOutput mockDataOutput = mockContext.mock(VersionedDataOutput.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockDataOutput).getVersion();
+ will(returnValue(Version.GEODE_170));
+ // processor id
+ oneOf(mockDataOutput).writeInt(with(any(int.class)));
+ inSequence(toData);
+ // txId.uniqId
+ oneOf(mockDataOutput).writeInt(0);
+ inSequence(toData);
+ // lockId
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // totalMaxSize
+ oneOf(mockDataOutput).writeInt(0);
+ inSequence(toData);
+ // txState.membershipId
+ oneOf(mockDataOutput).writeByte(-1);
+ inSequence(toData);
+ // txState.baseThreadId
+ oneOf(mockDataOutput).writeLong(with(any(long.class)));
+ inSequence(toData);
+ // txState.baseSequenceId
+ oneOf(mockDataOutput).writeLong(with(any(long.class)));
+ inSequence(toData);
+ // txState.needsLargeModCount
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // regionsSize
+ oneOf(mockDataOutput).writeInt(1);
+ inSequence(toData);
+
+ // regionPath: "/r"
+ oneOf(mockDataOutput).writeByte(87);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeShort(2);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBytes("/r");
+ inSequence(toData);
+ // parentRegionPath: null string
+ oneOf(mockDataOutput).writeByte(69);
+ inSequence(toData);
+ // opKeys size
+ oneOf(mockDataOutput).writeInt(1);
+ inSequence(toData);
+ // needsLargeModCount
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // versionMember
+ oneOf(mockDataOutput).writeByte(1);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(1);
+ inSequence(toData);
+ // opKeys[0]
+ oneOf(mockDataOutput).writeByte(87);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeShort(3);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBytes("key");
+ inSequence(toData);
+ // farSideData[0]
+ oneOf(mockDataOutput).writeByte(17);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(0);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ // shadowkey
+ oneOf(mockDataOutput).writeLong(-1L);
+ inSequence(toData);
+ // offset
+ oneOf(mockDataOutput).writeInt(with(any(int.class)));
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+
+ // bridgeContext
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ // farSiders
+ oneOf(mockDataOutput).writeByte(-1);
+ inSequence(toData);
+ }
+ });
+
+ final InternalDistributedMember mockInternalDistributedMember =
+ createInternalDistributedMember();
+
+ final TXId txId = new TXId(mockInternalDistributedMember, 0);
+ final TXState txState = new TXState(null, false);
+ final TXCommitMessage txCommitMessage = new TXCommitMessage(txId, null, txState);
+
+ final InternalRegion mockInternalRegion =
+ createMockInternalRegion(mockInternalDistributedMember);
+ txCommitMessage.startRegion(mockInternalRegion, 0);
+ final EntryEventImpl mockEntryEventImpl = createMockEntryEvent(mockInternalRegion);
+ final TXEntryState txEntryState = createTxEntryState(mockInternalRegion, mockEntryEventImpl);
+ txCommitMessage.addOp(null, "key", txEntryState, null);
+ txCommitMessage.finishRegionComplete();
+
+ txCommitMessage.toData(mockDataOutput);
+ }
+
+ @Test
+ public void toDataWithoutShadowKeyPre180Client() throws IOException {
+ final Sequence toData = mockContext.sequence("toData");
+ final VersionedDataOutput mockDataOutput = mockContext.mock(VersionedDataOutput.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockDataOutput).getVersion();
+ will(returnValue(Version.GEODE_170));
+ // processor id
+ oneOf(mockDataOutput).writeInt(with(any(int.class)));
+ inSequence(toData);
+ // txId.uniqId
+ oneOf(mockDataOutput).writeInt(0);
+ inSequence(toData);
+ // lockId
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // totalMaxSize
+ oneOf(mockDataOutput).writeInt(0);
+ inSequence(toData);
+ // txState.membershipId
+ oneOf(mockDataOutput).writeByte(-1);
+ inSequence(toData);
+ // txState.baseThreadId
+ oneOf(mockDataOutput).writeLong(with(any(long.class)));
+ inSequence(toData);
+ // txState.baseSequenceId
+ oneOf(mockDataOutput).writeLong(with(any(long.class)));
+ inSequence(toData);
+ // txState.needsLargeModCount
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // regionsSize
+ oneOf(mockDataOutput).writeInt(1);
+ inSequence(toData);
+
+ // regionPath: "/r"
+ oneOf(mockDataOutput).writeByte(87);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeShort(2);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBytes("/r");
+ inSequence(toData);
+ // parentRegionPath: null string
+ oneOf(mockDataOutput).writeByte(69);
+ inSequence(toData);
+ // opKeys size
+ oneOf(mockDataOutput).writeInt(1);
+ inSequence(toData);
+ // needsLargeModCount
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // versionMember
+ oneOf(mockDataOutput).writeByte(1);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(1);
+ inSequence(toData);
+ // opKeys[0]
+ oneOf(mockDataOutput).writeByte(87);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeShort(3);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBytes("key");
+ inSequence(toData);
+ // farSideData[0]
+ oneOf(mockDataOutput).writeByte(17);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(0);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ // no shadowkey
+ // offset
+ oneOf(mockDataOutput).writeInt(with(any(int.class)));
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+
+ // bridgeContext
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ // farSiders
+ oneOf(mockDataOutput).writeByte(-1);
+ inSequence(toData);
+ }
+ });
+
+ final InternalDistributedMember mockInternalDistributedMember =
+ createInternalDistributedMember();
+
+ final TXId txId = new TXId(mockInternalDistributedMember, 0);
+ final TXState txState = new TXState(null, false);
+ final TXCommitMessage txCommitMessage = new TXCommitMessage(txId, null, txState);
+ txCommitMessage.setClientVersion(Version.GEODE_170);
+
+ final InternalRegion mockInternalRegion =
+ createMockInternalRegion(mockInternalDistributedMember);
+ txCommitMessage.startRegion(mockInternalRegion, 0);
+ final EntryEventImpl mockEntryEventImpl = createMockEntryEvent(mockInternalRegion);
+ final TXEntryState txEntryState = createTxEntryState(mockInternalRegion, mockEntryEventImpl);
+ txCommitMessage.addOp(null, "key", txEntryState, null);
+ txCommitMessage.finishRegionComplete();
+
+ txCommitMessage.toData(mockDataOutput);
+ }
+
+ @Test
+ public void toDataWithShadowKeyPost180Server() throws IOException {
+ final Sequence toData = mockContext.sequence("toData");
+ final VersionedDataOutput mockDataOutput = mockContext.mock(VersionedDataOutput.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockDataOutput).getVersion();
+ will(returnValue(Version.CURRENT));
+ // processor id
+ oneOf(mockDataOutput).writeInt(with(any(int.class)));
+ inSequence(toData);
+ // txId.uniqId
+ oneOf(mockDataOutput).writeInt(0);
+ inSequence(toData);
+ // lockId
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // totalMaxSize
+ oneOf(mockDataOutput).writeInt(0);
+ inSequence(toData);
+ // txState.membershipId
+ oneOf(mockDataOutput).writeByte(-1);
+ inSequence(toData);
+ // txState.baseThreadId
+ oneOf(mockDataOutput).writeLong(with(any(long.class)));
+ inSequence(toData);
+ // txState.baseSequenceId
+ oneOf(mockDataOutput).writeLong(with(any(long.class)));
+ inSequence(toData);
+ // txState.needsLargeModCount
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // hasShadowKeys
+ oneOf(mockDataOutput).writeBoolean(true);
+ inSequence(toData);
+ // regionsSize
+ oneOf(mockDataOutput).writeInt(1);
+ inSequence(toData);
+
+ // regionPath: "/r"
+ oneOf(mockDataOutput).writeByte(87);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeShort(2);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBytes("/r");
+ inSequence(toData);
+ // parentRegionPath: null string
+ oneOf(mockDataOutput).writeByte(69);
+ inSequence(toData);
+ // opKeys size
+ oneOf(mockDataOutput).writeInt(1);
+ inSequence(toData);
+ // needsLargeModCount
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // versionMember
+ oneOf(mockDataOutput).writeByte(1);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(1);
+ inSequence(toData);
+ // opKeys[0]
+ oneOf(mockDataOutput).writeByte(87);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeShort(3);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBytes("key");
+ inSequence(toData);
+ // farSideData[0]
+ oneOf(mockDataOutput).writeByte(17);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(0);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ // shadowkey
+ oneOf(mockDataOutput).writeLong(-1L);
+ inSequence(toData);
+ // offset
+ oneOf(mockDataOutput).writeInt(with(any(int.class)));
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+
+ // bridgeContext
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ // farSiders
+ oneOf(mockDataOutput).writeByte(-1);
+ inSequence(toData);
+ }
+ });
+
+ final InternalDistributedMember mockInternalDistributedMember =
+ createInternalDistributedMember();
+
+ final TXId txId = new TXId(mockInternalDistributedMember, 0);
+ final TXState txState = new TXState(null, false);
+ final TXCommitMessage txCommitMessage = new TXCommitMessage(txId, null, txState);
+
+ final InternalRegion mockInternalRegion =
+ createMockInternalRegion(mockInternalDistributedMember);
+ txCommitMessage.startRegion(mockInternalRegion, 0);
+ final EntryEventImpl mockEntryEventImpl = createMockEntryEvent(mockInternalRegion);
+ final TXEntryState txEntryState = createTxEntryState(mockInternalRegion, mockEntryEventImpl);
+ txCommitMessage.addOp(null, "key", txEntryState, null);
+ txCommitMessage.finishRegionComplete();
+
+ txCommitMessage.toData(mockDataOutput);
+ }
+
+ @Test
+ public void toDataWithoutShadowKeyPost180Client() throws IOException {
+ final Sequence toData = mockContext.sequence("toData");
+ final VersionedDataOutput mockDataOutput = mockContext.mock(VersionedDataOutput.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockDataOutput).getVersion();
+ will(returnValue(Version.CURRENT));
+ // processor id
+ oneOf(mockDataOutput).writeInt(with(any(int.class)));
+ inSequence(toData);
+ // txId.uniqId
+ oneOf(mockDataOutput).writeInt(0);
+ inSequence(toData);
+ // lockId
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // totalMaxSize
+ oneOf(mockDataOutput).writeInt(0);
+ inSequence(toData);
+ // txState.membershipId
+ oneOf(mockDataOutput).writeByte(-1);
+ inSequence(toData);
+ // txState.baseThreadId
+ oneOf(mockDataOutput).writeLong(with(any(long.class)));
+ inSequence(toData);
+ // txState.baseSequenceId
+ oneOf(mockDataOutput).writeLong(with(any(long.class)));
+ inSequence(toData);
+ // txState.needsLargeModCount
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // hasShadowKeys
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // regionsSize
+ oneOf(mockDataOutput).writeInt(1);
+ inSequence(toData);
+
+ // regionPath: "/r"
+ oneOf(mockDataOutput).writeByte(87);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeShort(2);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBytes("/r");
+ inSequence(toData);
+ // parentRegionPath: null string
+ oneOf(mockDataOutput).writeByte(69);
+ inSequence(toData);
+ // opKeys size
+ oneOf(mockDataOutput).writeInt(1);
+ inSequence(toData);
+ // needsLargeModCount
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+ // versionMember
+ oneOf(mockDataOutput).writeByte(1);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(1);
+ inSequence(toData);
+ // opKeys[0]
+ oneOf(mockDataOutput).writeByte(87);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeShort(3);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBytes("key");
+ inSequence(toData);
+ // farSideData[0]
+ oneOf(mockDataOutput).writeByte(17);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(0);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ // no shadowkey
+ // offset
+ oneOf(mockDataOutput).writeInt(with(any(int.class)));
+ inSequence(toData);
+ oneOf(mockDataOutput).writeBoolean(false);
+ inSequence(toData);
+
+ // bridgeContext
+ oneOf(mockDataOutput).writeByte(41);
+ inSequence(toData);
+ // farSiders
+ oneOf(mockDataOutput).writeByte(-1);
+ inSequence(toData);
+ }
+ });
+
+ final InternalDistributedMember mockInternalDistributedMember =
+ createInternalDistributedMember();
+
+ final TXId txId = new TXId(mockInternalDistributedMember, 0);
+ final TXState txState = new TXState(null, false);
+ final TXCommitMessage txCommitMessage = new TXCommitMessage(txId, null, txState);
+ txCommitMessage.setClientVersion(Version.CURRENT);
+
+ final InternalRegion mockInternalRegion =
+ createMockInternalRegion(mockInternalDistributedMember);
+ txCommitMessage.startRegion(mockInternalRegion, 0);
+ final EntryEventImpl mockEntryEventImpl = createMockEntryEvent(mockInternalRegion);
+ final TXEntryState txEntryState = createTxEntryState(mockInternalRegion, mockEntryEventImpl);
+ txCommitMessage.addOp(null, "key", txEntryState, null);
+ txCommitMessage.finishRegionComplete();
+
+ txCommitMessage.toData(mockDataOutput);
+ }
+
+ @Test
+ public void fromDataWithShadowKeyPre180Server() throws Exception {
+ final Sequence fromData = mockContext.sequence("fromData");
+ final VersionedDataInput mockDataInput = mockContext.mock(VersionedDataInput.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockDataInput).getVersion();
+ will(returnValue(Version.GEODE_170));
+ // processor id
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // txId.uniqId
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // member version
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedByte();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedByte();
+ will(returnValue(1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ // durableId
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 0));
+ inSequence(fromData);
+
+ // lockId
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+ // totalMaxSize
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // txState.membershipId
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ // txState.baseThreadId
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ // txState.baseSequenceId
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ // txState.needsLargeModCount
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+
+ // regionsSize
+ oneOf(mockDataInput).readInt();
+ will(returnValue(1));
+ inSequence(fromData);
+
+ // regionPath: "/r"
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(2));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ // opKeys size
+ oneOf(mockDataInput).readInt();
+ will(returnValue(1));
+ inSequence(fromData);
+ // needsLargeModCount
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+ // versionMember
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+
+ // opKeys[0]
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(3));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+ // farSideData[0]
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 17));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ // shadowkey
+ oneOf(mockDataInput).readLong();
+ will(returnValue(-1L));
+ inSequence(fromData);
+ // offset
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+
+ // bridgeContext
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ // farSiders
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ }
+ });
+
+ final DistributionManager mockDistributionManager =
+ mockContext.mock(DistributionManager.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockDistributionManager).getCache();
+ will(returnValue(null));
+ allowing(mockDistributionManager).isLoner();
+ will(returnValue(false));
+ }
+ });
+
+ final TXCommitMessage txCommitMessage = new TXCommitMessage();
+ txCommitMessage.setDM(mockDistributionManager);
+ txCommitMessage.fromData(mockDataInput);
+ }
+
+ @Test
+ public void fromDataWithShadowKeyPost180Server() throws Exception {
+ final Sequence fromData = mockContext.sequence("fromData");
+ final DataInput mockDataInput = mockContext.mock(DataInput.class);
+ mockContext.checking(new Expectations() {
+ {
+ // processor id
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // txId.uniqId
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // member version
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedByte();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedByte();
+ will(returnValue(1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ // durableId
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 0));
+ inSequence(fromData);
+
+ // lockId
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+ // totalMaxSize
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // txState.membershipId
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ // txState.baseThreadId
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ // txState.baseSequenceId
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ // txState.needsLargeModCount
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+
+ // hasShadowKeys
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(true));
+ inSequence(fromData);
+
+ // regionsSize
+ oneOf(mockDataInput).readInt();
+ will(returnValue(1));
+ inSequence(fromData);
+
+ // regionPath: "/r"
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(2));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ // opKeys size
+ oneOf(mockDataInput).readInt();
+ will(returnValue(1));
+ inSequence(fromData);
+ // needsLargeModCount
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+ // versionMember
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+
+ // opKeys[0]
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(3));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+ // farSideData[0]
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 17));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ // shadowkey
+ oneOf(mockDataInput).readLong();
+ will(returnValue(-1L));
+ inSequence(fromData);
+ // offset
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+
+ // bridgeContext
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ // farSiders
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ }
+ });
+
+ final DistributionManager mockDistributionManager =
+ mockContext.mock(DistributionManager.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockDistributionManager).getCache();
+ will(returnValue(null));
+ allowing(mockDistributionManager).isLoner();
+ will(returnValue(false));
+ }
+ });
+
+ final TXCommitMessage txCommitMessage = new TXCommitMessage();
+ txCommitMessage.setDM(mockDistributionManager);
+ txCommitMessage.fromData(mockDataInput);
+ }
+
+ @Test
+ public void fromDataWithoutShadowKeyPost180Client() throws Exception {
+ final Sequence fromData = mockContext.sequence("fromData");
+ final DataInput mockDataInput = mockContext.mock(DataInput.class);
+ mockContext.checking(new Expectations() {
+ {
+ // processor id
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // txId.uniqId
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // member version
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedByte();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedByte();
+ will(returnValue(1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ // durableId
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 0));
+ inSequence(fromData);
+
+ // lockId
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+ // totalMaxSize
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ // txState.membershipId
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ // txState.baseThreadId
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ // txState.baseSequenceId
+ oneOf(mockDataInput).readLong();
+ will(returnValue(0L));
+ inSequence(fromData);
+ // txState.needsLargeModCount
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+
+ // hasShadowKeys
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+
+ // regionsSize
+ oneOf(mockDataInput).readInt();
+ will(returnValue(1));
+ inSequence(fromData);
+
+ // regionPath: "/r"
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(2));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 69));
+ inSequence(fromData);
+ // opKeys size
+ oneOf(mockDataInput).readInt();
+ will(returnValue(1));
+ inSequence(fromData);
+ // needsLargeModCount
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+ // versionMember
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+
+ // opKeys[0]
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 87));
+ inSequence(fromData);
+ oneOf(mockDataInput).readUnsignedShort();
+ will(returnValue(3));
+ inSequence(fromData);
+ oneOf(mockDataInput)
+ .readFully(with(any(byte[].class)), with(any(int.class)), with(any(int.class)));
+ inSequence(fromData);
+ // farSideData[0]
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 17));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ // no shadowkey
+ // offset
+ oneOf(mockDataInput).readInt();
+ will(returnValue(0));
+ inSequence(fromData);
+ oneOf(mockDataInput).readBoolean();
+ will(returnValue(false));
+ inSequence(fromData);
+
+ // bridgeContext
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) 41));
+ inSequence(fromData);
+ // farSiders
+ oneOf(mockDataInput).readByte();
+ will(returnValue((byte) -1));
+ inSequence(fromData);
+ }
+ });
+
+ final DistributionManager mockDistributionManager =
+ mockContext.mock(DistributionManager.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockDistributionManager).getCache();
+ will(returnValue(null));
+ allowing(mockDistributionManager).isLoner();
+ will(returnValue(true));
+ }
+ });
+
+ final TXCommitMessage txCommitMessage = new TXCommitMessage();
+ txCommitMessage.setDM(mockDistributionManager);
+ txCommitMessage.fromData(mockDataInput);
+ }
+
+ private InternalDistributedMember createInternalDistributedMember() throws IOException {
+ final InternalDistributedMember mockInternalDistributedMember =
+ mockContext.mock(InternalDistributedMember.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockInternalDistributedMember).getDSFID();
+ will(returnValue(1));
+ allowing(mockInternalDistributedMember).toData(with(any(DataOutput.class)));
+ allowing(mockInternalDistributedMember).getSerializationVersions();
+ will(returnValue(null));
+ }
+ });
+ return mockInternalDistributedMember;
+ }
+
+ private EntryEventImpl createMockEntryEvent(InternalRegion mockInternalRegion) {
+ final EntryEventImpl mockEntryEventImpl = mockContext.mock(EntryEventImpl.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockEntryEventImpl).isLocalInvalid();
+ will(returnValue(false));
+ allowing(mockEntryEventImpl).getRegion();
+ will(returnValue(mockInternalRegion));
+ ignoring(mockEntryEventImpl).putValueTXEntry(with(any(TXEntryState.class)));
+ ignoring(mockEntryEventImpl)
+ .setTXEntryOldValue(with(any(Object.class)), with(any(boolean.class)));
+ }
+ });
+ return mockEntryEventImpl;
+ }
+
+ private TXEntryState createTxEntryState(InternalRegion mockInternalRegion,
+ EntryEventImpl mockEntryEventImpl) {
+ final TXState txState = new TXState(null, false);
+ final TXRegionState txRegionState = new TXRegionState(mockInternalRegion, txState);
+ final TXEntryState txEntryState =
+ TXEntryState.getFactory().createEntry(null, null, null, null, txRegionState, false);
+ txEntryState.invalidate(mockEntryEventImpl);
+ txEntryState.generateEventOffsets(txState);
+ return txEntryState;
+ }
+
+ private InternalRegion createMockInternalRegion(VersionSource mockVersionSource) {
+ final InternalRegion mockInternalRegion = mockContext.mock(InternalRegion.class);
+ mockContext.checking(new Expectations() {
+ {
+ allowing(mockInternalRegion).requiresReliabilityCheck();
+ will(returnValue(false));
+ allowing(mockInternalRegion).getVersionMember();
+ will(returnValue(mockVersionSource));
+ allowing(mockInternalRegion).getFullPath();
+ will(returnValue("/r"));
+ allowing(mockInternalRegion).getPersistBackup();
+ will(returnValue(false));
+ allowing(mockInternalRegion).getScope();
+ will(returnValue(Scope.LOCAL));
+ allowing(mockInternalRegion).isEntryEvictionPossible();
+ will(returnValue(false));
+ allowing(mockInternalRegion).isEntryExpiryPossible();
+ will(returnValue(false));
+ ignoring(mockInternalRegion).setInUseByTransaction(true);
+ allowing(mockInternalRegion).getConcurrencyChecksEnabled();
+ will(returnValue(false));
+ }
+ });
+ return mockInternalRegion;
+ }
+
+}
diff --git a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 33e476c..0975d5a 100644
--- a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1311,8 +1311,8 @@ fromData,20
toData,9
org/apache/geode/internal/cache/TXCommitMessage,2
-fromData,211
-toData,292
+fromData,236
+toData,316
org/apache/geode/internal/cache/TXCommitMessage$CommitProcessForLockIdMessage,2
fromData,26
@@ -1331,8 +1331,8 @@ fromData,16
toData,16
org/apache/geode/internal/cache/TXCommitMessage$RegionCommit,2
-fromData,173
-toData,77
+fromData,164
+toData,82
org/apache/geode/internal/cache/TXCommitMessage$RegionCommit$FarSideEntryOp,2
fromData,171
--
To stop receiving notification emails like this one, please contact
jbarrett@apache.org.