You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2017/05/08 23:16:00 UTC
[22/46] geode git commit: GEODE-2632: change dependencies on
GemFireCacheImpl to InternalCache
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index e6bee71..662b557 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -12,14 +12,12 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -65,6 +63,7 @@ import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.locks.TXLockId;
import org.apache.geode.internal.cache.locks.TXLockIdImpl;
import org.apache.geode.internal.cache.locks.TXLockService;
+import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.versions.VersionSource;
@@ -80,7 +79,6 @@ import org.apache.geode.internal.offheap.annotations.Released;
* commit, to other cache members.
*
* @since GemFire 4.0
- *
*/
public class TXCommitMessage extends PooledDistributionMessage
implements MembershipListener, MessageWithReply {
@@ -98,12 +96,10 @@ public class TXCommitMessage extends PooledDistributionMessage
protected transient DM dm; // Used on the sending side of this message
private transient int sequenceNum = 0;
- private transient HashMap<InternalDistributedMember, RegionCommitList> msgMap = null; // Maps
- // receiver
- // Serializables
- // to
- // RegionCommitList
- // instances
+
+ // Maps receiver Serializables to RegionCommitList instances
+ private transient HashMap<InternalDistributedMember, RegionCommitList> msgMap = null;
+
private transient RegionCommit currentRegion;
protected transient TXState txState = null;
private transient boolean wasProcessed;
@@ -124,7 +120,7 @@ public class TXCommitMessage extends PooledDistributionMessage
private transient boolean hasReliableRegions = false;
/** Set of all caching exceptions produced hile processing this tx */
- private transient Set processingExceptions = Collections.EMPTY_SET;
+ private transient Set processingExceptions = Collections.emptySet();
private transient ClientProxyMembershipID bridgeContext = null;
@@ -149,15 +145,6 @@ public class TXCommitMessage extends PooledDistributionMessage
* transaction
*/
public static final TXCommitMessage EXCEPTION_MSG = new TXCommitMessage();
- /*
- * /** this is set if this message should deserialize the WAN shadowKey sent by the sender. Sender
- * will not send shadowKeys when there is a mix of 7.0 and 7.0.1 members
- *
- * private transient boolean shouldReadShadowKey; /** this is set if the sender has decided to
- * send WAN shadowKey for 7.0.1 members
- *
- * private transient boolean shouldWriteShadowKey;
- */
public TXCommitMessage(TXId txIdent, DM dm, TXState txState) {
this.dm = dm;
@@ -176,7 +163,7 @@ public class TXCommitMessage extends PooledDistributionMessage
// zero arg constructor for DataSerializer
}
- static public TXFarSideCMTracker getTracker() {
+ public static TXFarSideCMTracker getTracker() {
return TXCommitMessage.txTracker;
}
@@ -194,7 +181,7 @@ public class TXCommitMessage extends PooledDistributionMessage
* Return the TXCommitMessage we have already received that is associated with id. Note because of
* bug 37657 we may need to wait for it to show up.
*/
- static public TXCommitMessage waitForMessage(Object id, DM dm) {
+ public static TXCommitMessage waitForMessage(Object id, DM dm) {
TXFarSideCMTracker map = getTracker();
return map.waitForMessage(id, dm);
}
@@ -210,12 +197,10 @@ public class TXCommitMessage extends PooledDistributionMessage
// make sure we have some changes and someone to send them to
if (!this.currentRegion.isEmpty() && s != null && !s.isEmpty()) {
// Get the persistent ids for the current region and save them
- Map<InternalDistributedMember, PersistentMemberID> persistentIds =
- getPersistentIds(this.currentRegion.r);
- this.currentRegion.persistentIds = persistentIds;
+ this.currentRegion.persistentIds = getPersistentIds(this.currentRegion.r);
if (this.msgMap == null) {
- this.msgMap = new HashMap<InternalDistributedMember, RegionCommitList>();
+ this.msgMap = new HashMap<>();
}
{
RegionCommitList newRCL = null;
@@ -245,18 +230,18 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
}
- { // Now deal with each existing recipient that does not care
- // about this region
- Iterator<Map.Entry<InternalDistributedMember, RegionCommitList>> it =
- this.msgMap.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<InternalDistributedMember, RegionCommitList> me = it.next();
- if (!s.contains(me.getKey())) {
- RegionCommitList rcl = me.getValue();
- RegionCommitList trimmedRcl = rcl.trim(this.currentRegion);
- if (trimmedRcl != rcl) {
- me.setValue(trimmedRcl);
- }
+
+ // Now deal with each existing recipient that does not care
+ // about this region
+ Iterator<Map.Entry<InternalDistributedMember, RegionCommitList>> it =
+ this.msgMap.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<InternalDistributedMember, RegionCommitList> me = it.next();
+ if (!s.contains(me.getKey())) {
+ RegionCommitList rcl = me.getValue();
+ RegionCommitList trimmedRcl = rcl.trim(this.currentRegion);
+ if (trimmedRcl != rcl) {
+ me.setValue(trimmedRcl);
}
}
}
@@ -264,13 +249,11 @@ public class TXCommitMessage extends PooledDistributionMessage
this.currentRegion = null;
}
-
-
private Map<InternalDistributedMember, PersistentMemberID> getPersistentIds(LocalRegion r) {
if (r instanceof DistributedRegion) {
- return ((DistributedRegion) r).getCacheDistributionAdvisor().advisePersistentMembers();
+ return ((CacheDistributionAdvisee) r).getCacheDistributionAdvisor().advisePersistentMembers();
} else {
- return Collections.EMPTY_MAP;
+ return Collections.emptyMap();
}
}
@@ -287,17 +270,15 @@ public class TXCommitMessage extends PooledDistributionMessage
this.currentRegion = null;
}
-
Map viewVersions = new HashMap();
private Boolean needsLargeModCount;
private transient boolean disableListeners = false;
-
/** record CacheDistributionAdvisor.startOperation versions for later cleanup */
protected void addViewVersion(DistributedRegion dr, long version) {
- viewVersions.put(dr, Long.valueOf(version));
+ viewVersions.put(dr, version);
}
protected void releaseViewVersions() {
@@ -309,7 +290,7 @@ public class TXCommitMessage extends PooledDistributionMessage
// need to continue the iteration if one of the regions is destroyed
// since others may still be okay
try {
- long newv = dr.getDistributionAdvisor().endOperation(viewVersion.longValue());
+ long newv = dr.getDistributionAdvisor().endOperation(viewVersion);
} catch (RuntimeException ex) {
rte = ex;
}
@@ -484,13 +465,13 @@ public class TXCommitMessage extends PooledDistributionMessage
}
Set cacheClosedMembers =
- (processor == null) ? Collections.EMPTY_SET : processor.getCacheClosedMembers();
+ (processor == null) ? Collections.emptySet() : processor.getCacheClosedMembers();
Set departedMembers =
- (processor == null) ? Collections.EMPTY_SET : processor.getDepartedMembers();
+ (processor == null) ? Collections.emptySet() : processor.getDepartedMembers();
// check reliability on each region
- Set regionDistributionExceptions = Collections.EMPTY_SET;
- Set failedRegionNames = Collections.EMPTY_SET;
+ Set regionDistributionExceptions = Collections.emptySet();
+ Set failedRegionNames = Collections.emptySet();
for (Iterator iter = regionToRecipients.entrySet().iterator(); iter.hasNext();) {
Map.Entry me = (Map.Entry) iter.next();
final RegionCommit rc = (RegionCommit) me.getKey();
@@ -499,7 +480,7 @@ public class TXCommitMessage extends PooledDistributionMessage
successfulRecipients.removeAll(departedMembers);
// remove members who destroyed that region or closed their cache
- Set regionDestroyedMembers = (processor == null) ? Collections.EMPTY_SET
+ Set regionDestroyedMembers = (processor == null) ? Collections.emptySet()
: processor.getRegionDestroyedMembers(rc.r.getFullPath());
successfulRecipients.removeAll(cacheClosedMembers);
@@ -508,7 +489,7 @@ public class TXCommitMessage extends PooledDistributionMessage
try {
rc.r.handleReliableDistribution(successfulRecipients);
} catch (RegionDistributionException e) {
- if (regionDistributionExceptions == Collections.EMPTY_SET) {
+ if (regionDistributionExceptions == Collections.emptySet()) {
regionDistributionExceptions = new HashSet();
failedRegionNames = new HashSet();
}
@@ -525,11 +506,10 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
-
/**
* Helper method for send
*/
- private final void setRecipientsSendData(Set recipients, ReplyProcessor21 processor,
+ private void setRecipientsSendData(Set recipients, ReplyProcessor21 processor,
RegionCommitList rcl) {
setRecipients(recipients);
this.regions = rcl;
@@ -581,22 +561,21 @@ public class TXCommitMessage extends PooledDistributionMessage
this.farSideEntryOps.add(entryOp);
}
- protected final void addProcessingException(Exception e) {
+ protected void addProcessingException(Exception e) {
// clear all previous exceptions if e is a CacheClosedException
- if (this.processingExceptions == Collections.EMPTY_SET || e instanceof CancelException) {
+ if (this.processingExceptions == Collections.emptySet() || e instanceof CancelException) {
this.processingExceptions = new HashSet();
}
this.processingExceptions.add(e);
}
-
public void setDM(DM dm) {
this.dm = dm;
}
public DM getDM() {
if (this.dm == null) {
- GemFireCacheImpl cache = GemFireCacheImpl.getExisting("Applying TXCommit");
+ InternalCache cache = GemFireCacheImpl.getExisting("Applying TXCommit");
this.dm = cache.getDistributionManager();
}
return this.dm;
@@ -619,12 +598,9 @@ public class TXCommitMessage extends PooledDistributionMessage
if (logger.isDebugEnabled()) {
logger.debug("begin processing TXCommitMessage for {}", this.txIdent);
}
+ // do this before CacheFactory.getInstance for bug 33471
final int oldLevel =
- LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); // do this
- // before
- // CacheFactory.getInstance
- // for bug
- // 33471
+ LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
boolean forceListener = false; // this gets flipped if we need to fire tx listener
// it needs to default to false because we don't want to fire listeners on pr replicates
try {
@@ -642,20 +618,18 @@ public class TXCommitMessage extends PooledDistributionMessage
try {
// Pre-process each Region in the tx
try {
- {
- Iterator it = this.regions.iterator();
- while (it.hasNext()) {
- boolean failedBeginProcess = true;
- RegionCommit rc = (RegionCommit) it.next();
- try {
- failedBeginProcess = !rc.beginProcess(dm, this.txIdent, txEvent);
- } catch (CacheRuntimeException problem) {
- processCacheRuntimeException(problem);
- } finally {
- if (failedBeginProcess) {
- rc.r = null; // Cause related FarSideEntryOps to skip processing
- it.remove(); // Skip endProcessing as well
- }
+ Iterator it = this.regions.iterator();
+ while (it.hasNext()) {
+ boolean failedBeginProcess = true;
+ RegionCommit rc = (RegionCommit) it.next();
+ try {
+ failedBeginProcess = !rc.beginProcess(dm, this.txIdent, txEvent);
+ } catch (CacheRuntimeException problem) {
+ processCacheRuntimeException(problem);
+ } finally {
+ if (failedBeginProcess) {
+ rc.r = null; // Cause related FarSideEntryOps to skip processing
+ it.remove(); // Skip endProcessing as well
}
}
}
@@ -726,22 +700,20 @@ public class TXCommitMessage extends PooledDistributionMessage
}
public void basicProcessOps() {
- {
- List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size());
- Collections.sort(this.farSideEntryOps);
- Iterator it = this.farSideEntryOps.iterator();
- while (it.hasNext()) {
- try {
- RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp) it.next();
- entryOp.process(pendingCallbacks);
- } catch (CacheRuntimeException problem) {
- processCacheRuntimeException(problem);
- } catch (Exception e) {
- addProcessingException(e);
- }
+ List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size());
+ Collections.sort(this.farSideEntryOps);
+ Iterator it = this.farSideEntryOps.iterator();
+ while (it.hasNext()) {
+ try {
+ RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp) it.next();
+ entryOp.process(pendingCallbacks);
+ } catch (CacheRuntimeException problem) {
+ processCacheRuntimeException(problem);
+ } catch (Exception e) {
+ addProcessingException(e);
}
- firePendingCallbacks(pendingCallbacks);
}
+ firePendingCallbacks(pendingCallbacks);
}
private void firePendingCallbacks(List<EntryEventImpl> callbacks) {
@@ -943,7 +915,7 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public String toString() {
- StringBuffer result = new StringBuffer(256);
+ StringBuilder result = new StringBuilder(256);
result.append("TXCommitMessage@").append(System.identityHashCode(this)).append("#")
.append(this.sequenceNum).append(" processorId=").append(this.processorId).append(" txId=")
.append(this.txIdent);
@@ -974,7 +946,6 @@ public class TXCommitMessage extends PooledDistributionMessage
* that represents an entire transaction. At commit time the txCommitMessage sent to each node can
* be a subset of the transaction, this method will combine those subsets into a complete message.
*
- * @param msgSet
* @return the complete txCommitMessage
*/
public static TXCommitMessage combine(Set<TXCommitMessage> msgSet) {
@@ -994,8 +965,6 @@ public class TXCommitMessage extends PooledDistributionMessage
/**
* Combines the other TXCommitMessage into this message. Used to compute complete TXCommitMessage
* from parts.
- *
- * @param other
*/
public void combine(TXCommitMessage other) {
assert other != null;
@@ -1012,7 +981,7 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
- public final static class RegionCommitList extends ArrayList<RegionCommit> {
+ public static class RegionCommitList extends ArrayList<RegionCommit> {
private static final long serialVersionUID = -8910813949027683641L;
private transient boolean needsAck = false;
private transient RegionCommit trimRC = null;
@@ -1071,7 +1040,7 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public String toString() {
- StringBuffer result = new StringBuffer(256);
+ StringBuilder result = new StringBuilder(256);
result.append('@').append(System.identityHashCode(this)).append(' ').append(super.toString());
return result.toString();
}
@@ -1370,8 +1339,6 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
-
-
boolean isEmpty() {
return this.opKeys == null;
}
@@ -1424,7 +1391,7 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public String toString() {
- StringBuffer result = new StringBuffer(64);
+ StringBuilder result = new StringBuilder(64);
if (this.regionPath != null) {
result.append(this.regionPath);
} else {
@@ -1440,8 +1407,7 @@ public class TXCommitMessage extends PooledDistributionMessage
if (this.r != null) {
DataSerializer.writeString(this.r.getFullPath(), out);
if (this.r instanceof BucketRegion) {
- DataSerializer.writeString(((BucketRegion) this.r).getPartitionedRegion().getFullPath(),
- out);
+ DataSerializer.writeString(((Bucket) this.r).getPartitionedRegion().getFullPath(), out);
} else {
DataSerializer.writeString(null, out);
}
@@ -1537,7 +1503,6 @@ public class TXCommitMessage extends PooledDistributionMessage
* @param in the data input that is used to read the data for this entry op
* @param largeModCount true if the mod count is a int instead of a byte.
* @param readShadowKey true if a long shadowKey should be read
- * @throws ClassNotFoundException
*/
public void fromData(DataInput in, boolean largeModCount, boolean readShadowKey)
throws IOException, ClassNotFoundException {
@@ -1645,7 +1610,7 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
- final Object getTrackerKey() {
+ Object getTrackerKey() {
if (this.lockId != null) {
return this.lockId;
} else {
@@ -1657,41 +1622,39 @@ public class TXCommitMessage extends PooledDistributionMessage
* Used to prevent processing of the message if we have reported to other FarSiders that we did
* not received the CommitProcessMessage
*/
- final boolean dontProcess() {
+ boolean dontProcess() {
return this.dontProcess;
}
/**
* Indicate that this message should not be processed if we receive CommitProcessMessage (late)
*/
- final void setDontProcess() {
+ void setDontProcess() {
this.dontProcess = true;
}
- final boolean isProcessing() {
+ boolean isProcessing() {
return this.isProcessing;
}
- private final void setIsProcessing(boolean isProcessing) {
+ private void setIsProcessing(boolean isProcessing) {
this.isProcessing = isProcessing;
}
- final boolean wasProcessed() {
+ boolean wasProcessed() {
return this.wasProcessed;
}
- final void setProcessed(boolean wasProcessed) {
+ void setProcessed(boolean wasProcessed) {
this.wasProcessed = wasProcessed;
}
- /********************* Region Commit Process Messages ***************************************/
-
/**
* The CommitProcessForLockIDMessaage is sent by the Distributed ACK TX origin to the recipients
* (aka FarSiders) to indicate that a previously received RegionCommit that contained a lockId
* should commence processing.
*/
- static final public class CommitProcessForLockIdMessage extends CommitProcessMessage {
+ public static class CommitProcessForLockIdMessage extends CommitProcessMessage {
private TXLockId lockId;
public CommitProcessForLockIdMessage() {
@@ -1729,7 +1692,7 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public String toString() {
- StringBuffer result = new StringBuffer(128);
+ StringBuilder result = new StringBuilder(128);
result.append("CommitProcessForLockIdMessage@").append(System.identityHashCode(this))
.append(" lockId=").append(this.lockId);
return result.toString();
@@ -1743,7 +1706,7 @@ public class TXCommitMessage extends PooledDistributionMessage
* typically sent if all the TX changes are a result of load/netsearch/netload values (thus no
* lockid)
*/
- static final public class CommitProcessForTXIdMessage extends CommitProcessMessage {
+ public static class CommitProcessForTXIdMessage extends CommitProcessMessage {
private TXId txId;
public CommitProcessForTXIdMessage() {
@@ -1781,14 +1744,15 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public String toString() {
- StringBuffer result = new StringBuffer(128);
+ StringBuilder result = new StringBuilder(128);
result.append("CommitProcessForTXIdMessage@").append(System.identityHashCode(this))
.append(" txId=").append(this.txId);
return result.toString();
}
}
- static abstract public class CommitProcessMessage extends PooledDistributionMessage {
- protected final void basicProcess(final TXCommitMessage mess, final DistributionManager dm) {
+
+ public abstract static class CommitProcessMessage extends PooledDistributionMessage {
+ protected void basicProcess(final TXCommitMessage mess, final DistributionManager dm) {
dm.removeMembershipListener(mess);
synchronized (mess) {
if (mess.dontProcess()) {
@@ -1803,8 +1767,6 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
- /********************* Commit Process Query Message ***************************************/
-
/**
* The CommitProcessQueryMessage is used to attempt to recover - in the Distributed ACK TXs - when
* the origin of the CommitProcess messages departed from the distributed system. The sender of
@@ -1815,9 +1777,8 @@ public class TXCommitMessage extends PooledDistributionMessage
* about the the tracker key - opting not to have specific messages for each type like
* CommitProcessFor<Lock/TX>Id - and take the performance penalty of an extra call to
* DataSerializer
- *
*/
- static final public class CommitProcessQueryMessage extends PooledDistributionMessage {
+ public static class CommitProcessQueryMessage extends PooledDistributionMessage {
private Object trackerKey; // Either a TXLockId or a TXId
private int processorId;
@@ -1865,7 +1826,7 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public String toString() {
- StringBuffer result = new StringBuffer(128);
+ StringBuilder result = new StringBuilder(128);
result.append("CommitProcessQueryMessage@").append(System.identityHashCode(this))
.append(" trackerKeyClass=").append(this.trackerKey.getClass().getName())
.append(" trackerKey=").append(this.trackerKey).append(" processorId=")
@@ -1875,7 +1836,7 @@ public class TXCommitMessage extends PooledDistributionMessage
}
/********************* Commit Process Query Response Message **********************************/
- static final public class CommitProcessQueryReplyMessage extends ReplyMessage {
+ public static class CommitProcessQueryReplyMessage extends ReplyMessage {
private boolean wasReceived;
public CommitProcessQueryReplyMessage(boolean wasReceived) {
@@ -1909,7 +1870,7 @@ public class TXCommitMessage extends PooledDistributionMessage
@Override
public String toString() {
- StringBuffer result = new StringBuffer(128);
+ StringBuilder result = new StringBuilder(128);
result.append("CommitProcessQueryReplyMessage@").append(System.identityHashCode(this))
.append(" wasReceived=").append(this.wasReceived).append(" processorId=")
.append(this.processorId).append(" from ").append(this.getSender());
@@ -1918,7 +1879,7 @@ public class TXCommitMessage extends PooledDistributionMessage
}
/********************* Commit Process Query Response Processor *********************************/
- static final public class CommitProcessQueryReplyProcessor extends ReplyProcessor21 {
+ public static class CommitProcessQueryReplyProcessor extends ReplyProcessor21 {
public boolean receivedOnePositive;
CommitProcessQueryReplyProcessor(DM dm, Set members) {
@@ -1936,17 +1897,19 @@ public class TXCommitMessage extends PooledDistributionMessage
}
@Override
- final protected boolean canStopWaiting() {
+ protected boolean canStopWaiting() {
return this.receivedOnePositive;
}
- final public boolean receivedACommitProcessMessage() {
+ public boolean receivedACommitProcessMessage() {
return this.receivedOnePositive;
}
}
/********************* MembershipListener Implementation ***************************************/
- public void memberJoined(InternalDistributedMember id) {}
+ public void memberJoined(InternalDistributedMember id) {
+ // do nothing
+ }
public void memberSuspect(InternalDistributedMember id, InternalDistributedMember whoSuspected,
String reason) {}
@@ -2098,58 +2061,6 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
- // /** Custom subclass that keeps all ReplyExceptions */
- // private class ReliableCommitReplyProcessor extends ReliableReplyProcessor21 {
- //
- // /** Set of members that threw CacheClosedExceptions */
- // private Set cacheExceptions = new HashSet();
- // /** key=region path, value=Set of members */
- // private Map regionExceptions = new HashMap();
- //
- // public ReliableCommitReplyProcessor(DM dm,
- // Set initMembers) {
- // super(dm, initMembers);
- // }
- // protected synchronized void processException(DistributionMessage msg,
- // ReplyException re) {
- // // only interested in CommitReplyException
- // if (re instanceof CommitReplyException) {
- // CommitReplyException cre = (CommitReplyException) re;
- // Set exceptions = cre.getExceptions();
- // for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
- // Exception ex = (Exception) iter.next();
- // if (ex instanceof CacheClosedException) {
- // cacheExceptions.add(msg.getSender());
- // }
- // else if (ex instanceof RegionDestroyedException) {
- // String r = ((RegionDestroyedException)ex).getRegionFullPath();
- // Set members = (Set) regionExceptions.get(r);
- // if (members == null) {
- // members = new HashSet();
- // regionExceptions.put(r, members);
- // }
- // members.add(msg.getSender());
- // }
- // }
- // }
- // else {
- // // allow superclass to handle all other exceptions
- // super.processException(msg, re);
- // }
- // }
- // // these two accessors should be called after wait for replies completes
- // protected Set getCacheClosedMembers() {
- // return this.cacheExceptions;
- // }
- // protected Set getRegionDestroyedMembers(String regionFullPath) {
- // Set members = (Set) this.regionExceptions.get(regionFullPath);
- // if (members == null) {
- // members = Collections.EMPTY_SET;
- // }
- // return members;
- // }
- // }
-
/**
* Reply processor which collects all CommitReplyExceptions and emits a detailed failure exception
* if problems occur
@@ -2203,7 +2114,7 @@ public class TXCommitMessage extends PooledDistributionMessage
(CommitExceptionCollectingException) this.exception;
return cce.getCacheClosedMembers();
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
@@ -2213,7 +2124,7 @@ public class TXCommitMessage extends PooledDistributionMessage
(CommitExceptionCollectingException) this.exception;
return cce.getRegionDestroyedMembers(regionFullPath);
} else {
- return Collections.EMPTY_SET;
+ return Collections.emptySet();
}
}
}
@@ -2244,14 +2155,12 @@ public class TXCommitMessage extends PooledDistributionMessage
/**
* Determine if the commit processing was incomplete, if so throw a detailed exception
* indicating the source of the problem
- *
- * @param msgMap
*/
public void handlePotentialCommitFailure(
HashMap<InternalDistributedMember, RegionCommitList> msgMap) {
if (fatalExceptions.size() > 0) {
- StringBuffer errorMessage = new StringBuffer("Incomplete commit of transaction ").append(id)
- .append(". Caused by the following exceptions: ");
+ StringBuilder errorMessage = new StringBuilder("Incomplete commit of transaction ")
+ .append(id).append(". Caused by the following exceptions: ");
for (Iterator i = fatalExceptions.entrySet().iterator(); i.hasNext();) {
Map.Entry me = (Map.Entry) i.next();
DistributedMember mem = (DistributedMember) me.getKey();
@@ -2346,16 +2255,13 @@ public class TXCommitMessage extends PooledDistributionMessage
public Set getRegionDestroyedMembers(String regionFullPath) {
Set members = (Set) this.regionExceptions.get(regionFullPath);
if (members == null) {
- members = Collections.EMPTY_SET;
+ members = Collections.emptySet();
}
return members;
}
/**
* Protected by (this)
- *
- * @param member
- * @param exceptions
*/
public void addExceptionsFromMember(InternalDistributedMember member, Set exceptions) {
for (Iterator iter = exceptions.iterator(); iter.hasNext();) {
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
index 2948a48..a0a4d7c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXManagerImpl.java
@@ -12,16 +12,52 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.logging.log4j.Logger;
+
import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
-import org.apache.geode.cache.*;
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.CommitConflictException;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionId;
+import org.apache.geode.cache.TransactionInDoubtException;
+import org.apache.geode.cache.TransactionListener;
+import org.apache.geode.cache.TransactionWriter;
+import org.apache.geode.cache.UnsupportedOperationInTransactionException;
import org.apache.geode.distributed.TXManagerCancelledException;
-import org.apache.geode.distributed.internal.*;
+import org.apache.geode.distributed.internal.DM;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.SystemTimer.SystemTimerTask;
import org.apache.geode.internal.cache.tier.sockets.Message;
@@ -32,30 +68,14 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap.MapCallback;
-import org.apache.logging.log4j.Logger;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
/**
- * <p>
* The internal implementation of the {@link CacheTransactionManager} interface returned by
- * {@link GemFireCacheImpl#getCacheTransactionManager}. Internal operations
+ * {@link InternalCache#getCacheTransactionManager}. Internal operations
*
- * </code>TransactionListener</code> invocation, Region synchronization, transaction statistics and
+ * {@code TransactionListener} invocation, Region synchronization, transaction statistics and
*
* transaction logging are handled here
- *
*
* @since GemFire 4.0
*
@@ -67,12 +87,14 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
// Thread specific context container
private final ThreadLocal<TXStateProxy> txContext;
+
private static TXManagerImpl currentInstance = null;
+
// The unique transaction ID for this Manager
private final AtomicInteger uniqId;
private final DM dm;
- private final Cache cache;
+ private final InternalCache cache;
// The DistributionMemberID used to construct TXId's
private final InternalDistributedMember distributionMgrId;
@@ -86,8 +108,10 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
*/
public static final int NOTX = -1;
- private final ArrayList<TransactionListener> txListeners = new ArrayList<TransactionListener>(8);
+ private final List<TransactionListener> txListeners = new ArrayList<>(8);
+
public TransactionWriter writer = null;
+
private volatile boolean closed = false;
private final Map<TXId, TXStateProxy> hostedTXStates;
@@ -95,7 +119,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* the number of client initiated transactions to store for client failover
*/
- public final static int FAILOVER_TX_MAP_SIZE =
+ public static final int FAILOVER_TX_MAP_SIZE =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "transactionFailoverMapSize", 1000);
/**
@@ -106,6 +130,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
@SuppressWarnings("unchecked")
private Map<TXId, TXCommitMessage> failoverMap =
Collections.synchronizedMap(new LinkedHashMap<TXId, TXCommitMessage>() {
+ // TODO: inner class is serializable but outer class is not
private static final long serialVersionUID = -4156018226167594134L;
protected boolean removeEldestEntry(Entry eldest) {
@@ -114,7 +139,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
eldest.getKey(), (size() > FAILOVER_TX_MAP_SIZE));
}
return size() > FAILOVER_TX_MAP_SIZE;
- };
+ }
});
/**
@@ -126,8 +151,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* this keeps track of all the transactions that were initiated locally.
*/
- private ConcurrentMap<TXId, TXStateProxy> localTxMap =
- new ConcurrentHashMap<TXId, TXStateProxy>();
+ private ConcurrentMap<TXId, TXStateProxy> localTxMap = new ConcurrentHashMap<>();
/**
* the time in minutes after which any suspended transaction are rolled back. default is 30
@@ -152,49 +176,44 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* Constructor that implements the {@link CacheTransactionManager} interface. Only only one
* instance per {@link org.apache.geode.cache.Cache}
- *
- * @param cachePerfStats
*/
- public TXManagerImpl(CachePerfStats cachePerfStats, Cache cache) {
+ public TXManagerImpl(CachePerfStats cachePerfStats, InternalCache cache) {
this.cache = cache;
this.dm = ((InternalDistributedSystem) cache.getDistributedSystem()).getDistributionManager();
this.distributionMgrId = this.dm.getDistributionManagerId();
this.uniqId = new AtomicInteger(0);
this.cachePerfStats = cachePerfStats;
- this.hostedTXStates = new HashMap<TXId, TXStateProxy>();
- this.txContext = new ThreadLocal<TXStateProxy>();
- this.isTXDistributed = new ThreadLocal<Boolean>();
+ this.hostedTXStates = new HashMap<>();
+ this.txContext = new ThreadLocal<>();
+ this.isTXDistributed = new ThreadLocal<>();
this.transactionTimeToLive = Integer
.getInteger(DistributionConfig.GEMFIRE_PREFIX + "cacheServer.transactionTimeToLive", 180);
currentInstance = this;
}
- final Cache getCache() {
+ InternalCache getCache() {
return this.cache;
}
-
/**
* Get the TransactionWriter for the cache
*
* @return the current TransactionWriter
* @see TransactionWriter
*/
- public final TransactionWriter getWriter() {
+ public TransactionWriter getWriter() {
return writer;
}
-
- public final void setWriter(TransactionWriter writer) {
- if (((GemFireCacheImpl) this.cache).isClient()) {
+ public void setWriter(TransactionWriter writer) {
+ if (this.cache.isClient()) {
throw new IllegalStateException(
LocalizedStrings.TXManager_NO_WRITER_ON_CLIENT.toLocalizedString());
}
this.writer = writer;
}
-
- public final TransactionListener getListener() {
+ public TransactionListener getListener() {
synchronized (this.txListeners) {
if (this.txListeners.isEmpty()) {
return null;
@@ -280,7 +299,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
}
}
- final CachePerfStats getCachePerfStats() {
+ CachePerfStats getCachePerfStats() {
return this.cachePerfStats;
}
@@ -396,7 +415,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
noteCommitSuccess(opStart, lifeTime, tx);
}
- final void noteCommitFailure(long opStart, long lifeTime, TXStateInterface tx) {
+ void noteCommitFailure(long opStart, long lifeTime, TXStateInterface tx) {
long opEnd = CachePerfStats.getStatTime();
this.cachePerfStats.txFailure(opEnd - opStart, lifeTime, tx.getChanges());
TransactionListener[] listeners = getListeners();
@@ -428,7 +447,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
}
}
- final void noteCommitSuccess(long opStart, long lifeTime, TXStateInterface tx) {
+ void noteCommitSuccess(long opStart, long lifeTime, TXStateInterface tx) {
long opEnd = CachePerfStats.getStatTime();
this.cachePerfStats.txSuccess(opEnd - opStart, lifeTime, tx.getChanges());
TransactionListener[] listeners = getListeners();
@@ -497,7 +516,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
noteRollbackSuccess(opStart, lifeTime, tx);
}
- final void noteRollbackSuccess(long opStart, long lifeTime, TXStateInterface tx) {
+ void noteRollbackSuccess(long opStart, long lifeTime, TXStateInterface tx) {
long opEnd = CachePerfStats.getStatTime();
this.cachePerfStats.txRollback(opEnd - opStart, lifeTime, tx.getChanges());
TransactionListener[] listeners = getListeners();
@@ -597,7 +616,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
return retVal;
}
- public final void setTXState(TXStateProxy val) {
+ public void setTXState(TXStateProxy val) {
txContext.set(val);
}
@@ -625,11 +644,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
for (TXStateProxy proxy : this.localTxMap.values()) {
proxy.close();
}
- {
- TransactionListener[] listeners = getListeners();
- for (int i = 0; i < listeners.length; i++) {
- closeListener(listeners[i]);
- }
+ TransactionListener[] listeners = getListeners();
+ for (int i = 0; i < listeners.length; i++) {
+ closeListener(listeners[i]);
}
}
@@ -660,7 +677,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
* @return the state of the transaction or null. Pass this value to {@link TXManagerImpl#resume}
* to reactivate the suspended transaction.
*/
- public final TXStateProxy internalSuspend() {
+ public TXStateProxy internalSuspend() {
TXStateProxy result = getTXState();
if (result != null) {
result.suspend();
@@ -691,26 +708,26 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* @deprecated use internalResume instead
*/
- public final void resume(TXStateProxy tx) {
+ @Deprecated
+ public void resume(TXStateProxy tx) {
internalResume(tx);
}
- public final boolean isClosed() {
+ public boolean isClosed() {
return this.closed;
}
- private final void checkClosed() {
+ private void checkClosed() {
cache.getCancelCriterion().checkCancelInProgress(null);
if (this.closed) {
throw new TXManagerCancelledException("This transaction manager is closed.");
}
}
- final DM getDM() {
+ DM getDM() {
return this.dm;
}
-
public static int getCurrentTXUniqueId() {
if (currentInstance == null) {
return NOTX;
@@ -718,9 +735,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
return currentInstance.getMyTXUniqueId();
}
-
-
- public final static TXStateProxy getCurrentTXState() {
+ public static TXStateProxy getCurrentTXState() {
if (currentInstance == null) {
return null;
}
@@ -747,9 +762,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
* on the txState, on which this thread operates. Some messages like SizeMessage should not create
* a new txState.
*
- * @param msg
* @return {@link TXStateProxy} the txProxy for the transactional message
- * @throws InterruptedException
*/
public TXStateProxy masqueradeAs(TransactionMessage msg) throws InterruptedException {
if (msg.getTXUniqId() == NOTX || !msg.canParticipateInTransaction()) {
@@ -828,11 +841,8 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
* on the txState, on which this thread operates. Some messages like SizeMessage should not create
* a new txState.
*
- * @param msg
- * @param memberId
* @param probeOnly - do not masquerade; just look up the TX state
* @return {@link TXStateProxy} the txProxy for the transactional message
- * @throws InterruptedException
*/
public TXStateProxy masqueradeAs(Message msg, InternalDistributedMember memberId,
boolean probeOnly) throws InterruptedException {
@@ -846,8 +856,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
synchronized (this.hostedTXStates) {
val = this.hostedTXStates.get(key);
if (val == null) {
- // [sjigyasu] TODO: Conditionally create object based on distributed or non-distributed tx
- // mode
+ // TODO: Conditionally create object based on distributed or non-distributed tx mode
if (msg instanceof TransactionMessage
&& ((TransactionMessage) msg).isTransactionDistributed()) {
val = new DistTXStateProxyImplOnDatanode(this, key, memberId);
@@ -894,8 +903,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* Remove the association created by {@link #masqueradeAs(TransactionMessage)}
- *
- * @param tx
*/
public void unmasquerade(TXStateProxy tx) {
if (tx != null) {
@@ -907,7 +914,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* Cleanup the remote txState after commit and rollback
*
- * @param txId
* @return the TXStateProxy
*/
public TXStateProxy removeHostedTXState(TXId txId) {
@@ -942,7 +948,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* Used to verify if a transaction with a given id is hosted by this txManager.
*
- * @param txId
* @return true if the transaction is in progress, false otherwise
*/
public boolean isHostedTxInProgress(TXId txId) {
@@ -1104,7 +1109,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
* If the given transaction is already being completed by another thread this will wait for that
* completion to finish and will ensure that the result is saved in the client failover map.
*
- * @param txId
* @return true if a wait was performed
*/
public boolean waitForCompletingTransaction(TXId txId) {
@@ -1132,7 +1136,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* Returns the TXCommitMessage for a transaction that has been successfully completed.
*
- * @param txId
* @return the commit message or an exception token e.g {@link TXCommitMessage#CMT_CONFLICT_MSG}
* if the transaction threw an exception
* @see #isExceptionToken(TXCommitMessage)
@@ -1142,7 +1145,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
}
/**
- * @param msg
* @return true if msg is an exception token, false otherwise
*/
public boolean isExceptionToken(TXCommitMessage msg) {
@@ -1158,7 +1160,6 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
* during transaction execution.
*
* @param msg the token that represents the exception
- * @param txId
* @return the exception
*/
public RuntimeException getExceptionForToken(TXCommitMessage msg, TXId txId) {
@@ -1209,13 +1210,12 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
@Override
protected void process(DistributionManager dm) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
TXManagerImpl mgr = cache.getTXMgr();
mgr.removeTransactions(this.txIds, false);
}
}
-
}
private ConcurrentMap<TransactionId, TXStateProxy> suspendedTXs =
@@ -1290,8 +1290,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
SystemTimerTask task = this.expiryTasks.remove(txProxy.getTransactionId());
if (task != null) {
if (task.cancel()) {
- GemFireCacheImpl cache = (GemFireCacheImpl) this.cache;
- cache.purgeCCPTimer();
+ this.cache.purgeCCPTimer();
}
}
}
@@ -1300,8 +1299,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
* this map keeps track of all the threads that are waiting in
* {@link #tryResume(TransactionId, long, TimeUnit)} for a particular transactionId
*/
- private ConcurrentMap<TransactionId, Queue<Thread>> waitMap =
- new ConcurrentHashMap<TransactionId, Queue<Thread>>();
+ private ConcurrentMap<TransactionId, Queue<Thread>> waitMap = new ConcurrentHashMap<>();
public boolean tryResume(TransactionId transactionId, long time, TimeUnit unit) {
if (transactionId == null || getTXState() != null || !exists(transactionId)) {
@@ -1383,11 +1381,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* schedules the transaction to expire after {@link #suspendedTXTimeout}
*
- * @param txId
* @param expiryTimeUnit the time unit to use when scheduling the expiration
*/
private void scheduleExpiry(TransactionId txId, TimeUnit expiryTimeUnit) {
- final GemFireCacheImpl cache = (GemFireCacheImpl) this.cache;
if (suspendedTXTimeout < 0) {
if (logger.isDebugEnabled()) {
logger.debug("TX: transaction: {} not scheduled to expire", txId);
@@ -1452,9 +1448,13 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
}
private static class RefCountMapEntry
implements HashEntry<AbstractRegionEntry, RefCountMapEntry> {
+
private final AbstractRegionEntry key;
+
private HashEntry<AbstractRegionEntry, RefCountMapEntry> next;
+
private volatile int refCount;
+
private static final AtomicIntegerFieldUpdater<RefCountMapEntry> refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(RefCountMapEntry.class, "refCount");
@@ -1561,7 +1561,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
}
};
- public static final void incRefCount(AbstractRegionEntry re) {
+ public static void incRefCount(AbstractRegionEntry re) {
TXManagerImpl mgr = currentInstance;
if (mgr != null) {
mgr.refCountMap.create(re, incCallback, null, null, true);
@@ -1571,7 +1571,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
/**
* Return true if refCount went to zero.
*/
- public static final boolean decRefCount(AbstractRegionEntry re) {
+ public static boolean decRefCount(AbstractRegionEntry re) {
TXManagerImpl mgr = currentInstance;
if (mgr != null) {
return mgr.refCountMap.removeConditionally(re, decCallback, null, null) != null;
@@ -1628,9 +1628,9 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
}
};
try {
- ((GemFireCacheImpl) this.cache).getCCPTimer().schedule(task, timeout);
+ this.cache.getCCPTimer().schedule(task, timeout);
} catch (IllegalStateException ise) {
- if (!((GemFireCacheImpl) this.cache).isClosed()) {
+ if (!this.cache.isClosed()) {
throw ise;
}
// task not able to be scheduled due to cache is closing,
@@ -1716,7 +1716,7 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
LocalizedStrings.TXManagerImpl_CANNOT_CHANGE_TRANSACTION_MODE_WHILE_TRANSACTIONS_ARE_IN_PROGRESS
.toLocalizedString());
} else {
- isTXDistributed.set(new Boolean(flag));
+ isTXDistributed.set(flag);
}
}
@@ -1726,14 +1726,13 @@ public class TXManagerImpl implements CacheTransactionManager, MembershipListene
* default value of this property.
*/
public boolean isDistributed() {
-
Boolean value = isTXDistributed.get();
// This can be null if not set in setDistributed().
if (value == null) {
InternalDistributedSystem ids = (InternalDistributedSystem) cache.getDistributedSystem();
return ids.getOriginalConfig().getDistributedTransactions();
} else {
- return value.booleanValue();
+ return value;
}
}
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java
index fd53fb1..24cbaa2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXMessage.java
@@ -39,20 +39,20 @@ import org.apache.geode.internal.cache.partitioned.PartitionMessage;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-/**
- *
- *
- */
public abstract class TXMessage extends SerialDistributionMessage
implements MessageWithReply, TransactionMessage {
private static final Logger logger = LogService.getLogger();
private int processorId;
+
private int txUniqId;
+
private InternalDistributedMember txMemberId = null;
- public TXMessage() {}
+ public TXMessage() {
+ // nothing
+ }
public TXMessage(int txUniqueId, InternalDistributedMember onBehalfOfMember,
ReplyProcessor21 processor) {
@@ -73,7 +73,7 @@ public abstract class TXMessage extends SerialDistributionMessage
if (logger.isDebugEnabled()) {
logger.debug("processing {}", this);
}
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (checkCacheClosing(cache) || checkDSClosing(cache.getInternalDistributedSystem())) {
thr = new CacheClosedException(LocalizedStrings.PartitionMessage_REMOTE_CACHE_IS_CLOSED_0
.toLocalizedString(dm.getId()));
@@ -130,7 +130,7 @@ public abstract class TXMessage extends SerialDistributionMessage
return distributedSystem == null || distributedSystem.isDisconnecting();
}
- private boolean checkCacheClosing(GemFireCacheImpl cache) {
+ private boolean checkCacheClosing(InternalCache cache) {
return cache == null || cache.isClosed();
}
@@ -160,7 +160,7 @@ public abstract class TXMessage extends SerialDistributionMessage
* Transaction operations override this method to do actual work
*
* @param txId The transaction Id to operate on
- * @return true if {@link TXMessage} should send a reply false otherwise
+ * @return true if TXMessage should send a reply false otherwise
*/
protected abstract boolean operateOnTx(TXId txId, DistributionManager dm)
throws RemoteOperationException;
@@ -192,7 +192,6 @@ public abstract class TXMessage extends SerialDistributionMessage
return txMemberId;
}
-
@Override
public int getProcessorId() {
return this.processorId;
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java
index eefa27c..1e586aa 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionLockRequestImpl.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache;
import java.io.DataInput;
@@ -26,23 +25,23 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.cache.locks.TXRegionLockRequest;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
/**
* TXRegionLockRequest represents all the locks that need to be made for a single region.
*
- *
* @since GemFire 4.0
- *
*/
-public class TXRegionLockRequestImpl
- implements org.apache.geode.internal.cache.locks.TXRegionLockRequest {
+public class TXRegionLockRequestImpl implements TXRegionLockRequest {
private static final long serialVersionUID = 5840033961584078082L;
private static final Logger logger = LogService.getLogger();
private transient LocalRegion r;
+
private String regionPath;
+
private Set<Object> entryKeys;
public TXRegionLockRequestImpl() {
@@ -93,26 +92,26 @@ public class TXRegionLockRequestImpl
this.entryKeys.add(key);
}
- public final void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
this.regionPath = DataSerializer.readString(in);
- final GemFireCacheImpl cache = getCache(false);
+ final InternalCache cache = getCache(false);
try {
final int size = InternalDataSerializer.readArrayLength(in);
if (cache != null && size > 0) {
this.r = (LocalRegion) cache.getRegion(this.regionPath);
}
this.entryKeys = readEntryKeySet(size, in);
- } catch (CacheClosedException cce) {
+ } catch (CacheClosedException ignore) {
// don't throw in deserialization
this.entryKeys = null;
}
}
- private final Set<Object> readEntryKeySet(final int size, final DataInput in)
+ private Set<Object> readEntryKeySet(final int size, final DataInput in)
throws IOException, ClassNotFoundException {
- if (logger.isDebugEnabled()) {
+ if (logger.isTraceEnabled()) {
logger.trace(LogMarker.SERIALIZER, "Reading HashSet with size {}", size);
}
@@ -135,21 +134,21 @@ public class TXRegionLockRequestImpl
InternalDataSerializer.writeSet(this.entryKeys, out);
}
- public static final TXRegionLockRequestImpl createFromData(DataInput in)
+ public static TXRegionLockRequestImpl createFromData(DataInput in)
throws IOException, ClassNotFoundException {
TXRegionLockRequestImpl result = new TXRegionLockRequestImpl();
InternalDataSerializer.invokeFromData(result, in);
return result;
}
- public final String getRegionFullPath() {
+ public String getRegionFullPath() {
if (this.regionPath == null) {
this.regionPath = this.r.getFullPath();
}
return this.regionPath;
}
- public final Set<Object> getKeys() {
+ public Set<Object> getKeys() {
if (this.entryKeys == null) {
// check for cache closed/closing
getCache(true);
@@ -157,8 +156,8 @@ public class TXRegionLockRequestImpl
return this.entryKeys;
}
- private final GemFireCacheImpl getCache(boolean throwIfClosing) {
- final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ private InternalCache getCache(boolean throwIfClosing) {
+ final InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.isClosed()) {
if (throwIfClosing) {
cache.getCancelCriterion().checkCancelInProgress(null);
@@ -175,7 +174,7 @@ public class TXRegionLockRequestImpl
* Only safe to call in the vm that creates this request. Once it is serialized this method will
* return null.
*/
- public final LocalRegion getLocalRegion() {
+ public LocalRegion getLocalRegion() {
return this.r;
}
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java
index 167f1c1..496a812 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRegionState.java
@@ -97,7 +97,7 @@ public class TXRegionState {
public TXEntryState createReadEntry(LocalRegion r, Object entryKey, RegionEntry re, Object vId,
Object pendingValue) {
- GemFireCacheImpl cache = r.getCache();
+ InternalCache cache = r.getCache();
boolean isDistributed = false;
if (cache.getTxManager().getTXState() != null) {
isDistributed = cache.getTxManager().getTXState().isDistTx();
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
index 725ad64..6a1eeed 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteCommitMessage.java
@@ -40,16 +40,13 @@ import org.apache.geode.internal.cache.RemoteOperationMessage.RemoteOperationRes
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
-/**
- *
- *
- */
public class TXRemoteCommitMessage extends TXMessage {
-
private static final Logger logger = LogService.getLogger();
/** for deserialization */
- public TXRemoteCommitMessage() {}
+ public TXRemoteCommitMessage() {
+ // nothing
+ }
public TXRemoteCommitMessage(int txUniqId, InternalDistributedMember onBehalfOfClientMember,
ReplyProcessor21 processor) {
@@ -76,7 +73,7 @@ public class TXRemoteCommitMessage extends TXMessage {
@Override
protected boolean operateOnTx(TXId txId, DistributionManager dm) throws RemoteOperationException {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
TXManagerImpl txMgr = cache.getTXMgr();
if (logger.isDebugEnabled()) {
@@ -124,8 +121,6 @@ public class TXRemoteCommitMessage extends TXMessage {
return true;
}
-
-
/**
* This message is used for the reply to a remote commit operation: a commit from a stub to the tx
* host. This is the reply to a {@link TXRemoteCommitMessage}.
@@ -133,7 +128,9 @@ public class TXRemoteCommitMessage extends TXMessage {
* @since GemFire 6.5
*/
public static final class TXRemoteCommitReplyMessage extends ReplyMessage {
+
private transient TXCommitMessage commitMessage;
+
/*
* Used on the fromData side to transfer the value bytes to the requesting thread
*/
@@ -142,7 +139,9 @@ public class TXRemoteCommitMessage extends TXMessage {
/**
* Empty constructor to conform to DataSerializable interface
*/
- public TXRemoteCommitReplyMessage() {}
+ public TXRemoteCommitReplyMessage() {
+ // nothing
+ }
public TXRemoteCommitReplyMessage(DataInput in) throws IOException, ClassNotFoundException {
fromData(in);
@@ -219,7 +218,7 @@ public class TXRemoteCommitMessage extends TXMessage {
@Override
public String toString() {
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
sb.append("TXRemoteCommitReplyMessage ").append("processorid=").append(this.processorId)
.append(" reply to sender ").append(this.getSender());
return sb.toString();
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java
index e416e11..13b783f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXRemoteRollbackMessage.java
@@ -30,15 +30,12 @@ import org.apache.geode.distributed.internal.membership.InternalDistributedMembe
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
-/**
- *
- *
- */
public class TXRemoteRollbackMessage extends TXMessage {
-
private static final Logger logger = LogService.getLogger();
- public TXRemoteRollbackMessage() {}
+ public TXRemoteRollbackMessage() {
+ // nothing
+ }
public TXRemoteRollbackMessage(int txUniqId, InternalDistributedMember onBehalfOfClientMember,
ReplyProcessor21 processor) {
@@ -60,7 +57,7 @@ public class TXRemoteRollbackMessage extends TXMessage {
@Override
protected boolean operateOnTx(TXId txId, DistributionManager dm) {
- GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ InternalCache cache = GemFireCacheImpl.getInstance();
if (cache == null) {
throw new CacheClosedException(
LocalizedStrings.CacheFactory_A_CACHE_HAS_NOT_YET_BEEN_CREATED.toLocalizedString());
http://git-wip-us.apache.org/repos/asf/geode/blob/557a127b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
index 49922a0..22b95f3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXStateProxyImpl.java
@@ -12,9 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-/**
- * File comment
- */
package org.apache.geode.internal.cache;
import java.util.Collection;
@@ -48,11 +45,7 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-/**
- *
- */
public class TXStateProxyImpl implements TXStateProxy {
-
private static final Logger logger = LogService.getLogger();
protected static final AtomicBoolean txDistributedClientWarningIssued = new AtomicBoolean();
@@ -63,6 +56,7 @@ public class TXStateProxyImpl implements TXStateProxy {
protected DistributedMember target;
private boolean commitRequestedByOwner;
private boolean isJCATransaction;
+
/**
* for client/server JTA transactions we need to have a single thread handle both beforeCompletion
* and afterCompletion so that beforeC can obtain locks for the afterC step. This is that thread
@@ -88,39 +82,26 @@ public class TXStateProxyImpl implements TXStateProxy {
return this.synchRunnable;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getSemaphore()
- */
public ReentrantLock getLock() {
return this.lock;
}
-
- /**
- * @return the isJTA
- */
final boolean isJTA() {
return isJTA;
}
- /**
- * @return the txId
- */
final public TXId getTxId() {
return txId;
}
- /**
- * @return the txMgr
- */
public final TXManagerImpl getTxMgr() {
return txMgr;
}
protected volatile TXStateInterface realDeal;
+
protected boolean inProgress = true;
+
protected InternalDistributedMember onBehalfOfClientMember = null;
/**
@@ -184,10 +165,6 @@ public class TXStateProxyImpl implements TXStateProxy {
return this.realDeal;
}
- /**
- * @param managerImpl
- * @param id
- */
public TXStateProxyImpl(TXManagerImpl managerImpl, TXId id,
InternalDistributedMember clientMember) {
this.txMgr = managerImpl;
@@ -196,11 +173,6 @@ public class TXStateProxyImpl implements TXStateProxy {
this.onBehalfOfClientMember = clientMember;
}
- /**
- * @param managerImpl
- * @param id
- * @param isjta
- */
public TXStateProxyImpl(TXManagerImpl managerImpl, TXId id, boolean isjta) {
this.txMgr = managerImpl;
this.txId = id;
@@ -219,16 +191,6 @@ public class TXStateProxyImpl implements TXStateProxy {
this.isJTA = isJTA;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#checkJTA(java.lang.String)
- */
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateProxyInterface#checkJTA(java.lang.String)
- */
public void checkJTA(String errmsg) throws IllegalStateException {
if (isJTA()) {
throw new IllegalStateException(errmsg);
@@ -243,11 +205,6 @@ public class TXStateProxyImpl implements TXStateProxy {
.toLocalizedString("precommit"));
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#commit()
- */
public void commit() throws CommitConflictException {
boolean preserveTx = false;
try {
@@ -284,12 +241,6 @@ public class TXStateProxyImpl implements TXStateProxy {
return (TransactionException) e;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#containsValueForKey(java.lang.Object,
- * org.apache.geode.internal.cache.LocalRegion)
- */
public boolean containsValueForKey(KeyInfo keyInfo, LocalRegion region) {
try {
this.operationCount++;
@@ -302,7 +253,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
private void trackBucketForTx(KeyInfo keyInfo) {
- GemFireCacheImpl cache = (GemFireCacheImpl) txMgr.getCache();
if (keyInfo.getBucketId() >= 0) {
if (logger.isDebugEnabled()) {
logger.debug("adding bucket:{} for tx:{}", keyInfo.getBucketId(), getTransactionId());
@@ -313,13 +263,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.TXStateInterface#destroyExistingEntry(org.apache.geode.internal
- * .cache.EntryEventImpl, boolean, java.lang.Object)
- */
public void destroyExistingEntry(EntryEventImpl event, boolean cacheWrite,
Object expectedOldValue) throws EntryNotFoundException {
try {
@@ -332,40 +275,19 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getBeginTime()
- */
public long getBeginTime() {
return getRealDeal(null, null).getBeginTime();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getCache()
- */
public Cache getCache() {
return txMgr.getCache();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getChanges()
- */
public int getChanges() {
assertBootstrapped();
return getRealDeal(null, null).getChanges();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getDeserializedValue(java.lang.Object,
- * org.apache.geode.internal.cache.LocalRegion, boolean)
- */
public Object getDeserializedValue(KeyInfo keyInfo, LocalRegion localRegion, boolean updateStats,
boolean disableCopyOnRead, boolean preferCD, EntryEventImpl clientEvent,
boolean returnTombstones, boolean retainResult) {
@@ -379,12 +301,6 @@ public class TXStateProxyImpl implements TXStateProxy {
return val;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getEntry(java.lang.Object,
- * org.apache.geode.internal.cache.LocalRegion)
- */
public Entry getEntry(KeyInfo keyInfo, LocalRegion region, boolean allowTombstones) {
try {
this.operationCount++;
@@ -396,51 +312,25 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getEvent()
- */
public TXEvent getEvent() {
assertBootstrapped();
return getRealDeal(null, null).getEvent();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getEvents()
- */
public List getEvents() {
assertBootstrapped();
return getRealDeal(null, null).getEvents();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getRegions()
- */
public Collection<LocalRegion> getRegions() {
assertBootstrapped();
return getRealDeal(null, null).getRegions();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#getTransactionId()
- */
public TransactionId getTransactionId() {
return txId;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#invalidateExistingEntry(org.apache.geode.
- * internal.cache.EntryEventImpl, boolean, boolean)
- */
public void invalidateExistingEntry(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) {
try {
@@ -453,11 +343,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#isInProgress()
- */
public boolean isInProgress() {
return inProgress;
}
@@ -467,54 +352,26 @@ public class TXStateProxyImpl implements TXStateProxy {
this.inProgress = progress;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#needsLargeModCount()
- */
public boolean needsLargeModCount() {
assertBootstrapped();
return getRealDeal(null, null).needsLargeModCount();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#nextModSerialNum()
- */
public int nextModSerialNum() {
assertBootstrapped();
return getRealDeal(null, null).nextModSerialNum();
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.TXStateInterface#readRegion(org.apache.geode.internal.cache.
- * LocalRegion)
- */
public TXRegionState readRegion(LocalRegion r) {
assertBootstrapped();
return getRealDeal(null, r).readRegion(r);
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#rmRegion(org.apache.geode.internal.cache.
- * LocalRegion)
- */
public void rmRegion(LocalRegion r) {
assertBootstrapped();
getRealDeal(null, r).rmRegion(r);
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#rollback()
- */
public void rollback() {
try {
getRealDeal(null, null).rollback();
@@ -526,13 +383,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.TXStateInterface#txPutEntry(org.apache.geode.internal.cache.
- * EntryEventImpl, boolean, boolean, boolean)
- */
public boolean txPutEntry(EntryEventImpl event, boolean ifNew, boolean requireOldValue,
boolean checkResources, Object expectedOldValue) {
try {
@@ -546,12 +396,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#txReadEntry(java.lang.Object,
- * org.apache.geode.internal.cache.LocalRegion, boolean)
- */
public TXEntryState txReadEntry(KeyInfo keyInfo, LocalRegion localRegion, boolean rememberRead,
boolean createTxEntryIfAbsent) {
try {
@@ -565,36 +409,15 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.TXStateInterface#txReadRegion(org.apache.geode.internal.cache.
- * LocalRegion)
- */
public TXRegionState txReadRegion(LocalRegion localRegion) {
assertBootstrapped();
return getRealDeal(null, localRegion).txReadRegion(localRegion);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.TXStateInterface#txWriteRegion(org.apache.geode.internal.cache.
- * LocalRegion, java.lang.Object)
- */
public TXRegionState txWriteRegion(LocalRegion localRegion, KeyInfo entryKey) {
return getRealDeal(entryKey, localRegion).txWriteRegion(localRegion, entryKey);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.TXStateInterface#writeRegion(org.apache.geode.internal.cache.
- * LocalRegion)
- */
public TXRegionState writeRegion(LocalRegion r) {
assertBootstrapped();
return getRealDeal(null, r).writeRegion(r);
@@ -604,11 +427,6 @@ public class TXStateProxyImpl implements TXStateProxy {
assert realDeal != null;
}
- /*
- * (non-Javadoc)
- *
- * @see javax.transaction.Synchronization#afterCompletion(int)
- */
public void afterCompletion(int status) {
assertBootstrapped();
try {
@@ -621,22 +439,11 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see javax.transaction.Synchronization#beforeCompletion()
- */
public void beforeCompletion() {
assertBootstrapped();
getRealDeal(null, null).beforeCompletion();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.InternalDataView#containsKey(java.lang.Object,
- * org.apache.geode.internal.cache.LocalRegion)
- */
public boolean containsKey(KeyInfo keyInfo, LocalRegion localRegion) {
try {
this.operationCount++;
@@ -648,13 +455,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#entryCount(org.apache.geode.internal.cache.
- * LocalRegion)
- */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "UL_UNRELEASED_LOCK",
justification = "This method unlocks and then conditionally undoes the unlock in the finally-block. Review again at later time.")
public int entryCount(LocalRegion localRegion) {
@@ -684,13 +484,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#findObject(org.apache.geode.internal.cache.
- * LocalRegion, java.lang.Object, java.lang.Object, boolean, boolean, java.lang.Object)
- */
public Object findObject(KeyInfo key, LocalRegion r, boolean isCreate, boolean generateCallbacks,
Object value, boolean disableCopyOnRead, boolean preferCD,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
@@ -706,13 +499,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#getAdditionalKeysForIterator(org.apache.geode.
- * internal.cache.LocalRegion)
- */
public Set getAdditionalKeysForIterator(LocalRegion currRgn) {
if (this.realDeal == null) {
return null;
@@ -720,13 +506,6 @@ public class TXStateProxyImpl implements TXStateProxy {
return getRealDeal(null, currRgn).getAdditionalKeysForIterator(currRgn);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#getEntryForIterator(org.apache.geode.internal.
- * cache.LocalRegion, java.lang.Object, boolean)
- */
public Object getEntryForIterator(KeyInfo key, LocalRegion currRgn, boolean rememberReads,
boolean allowTombstones) {
boolean resetTxState = this.realDeal == null;
@@ -745,15 +524,8 @@ public class TXStateProxyImpl implements TXStateProxy {
getTxMgr().internalResume(txp);
}
}
-
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.InternalDataView#getKeyForIterator(java.lang.Object,
- * org.apache.geode.internal.cache.LocalRegion, boolean)
- */
public Object getKeyForIterator(KeyInfo keyInfo, LocalRegion currRgn, boolean rememberReads,
boolean allowTombstones) {
boolean resetTxState = this.realDeal == null;
@@ -775,33 +547,16 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.InternalDataView#getValueInVM(java.lang.Object,
- * org.apache.geode.internal.cache.LocalRegion, boolean)
- */
public Object getValueInVM(KeyInfo keyInfo, LocalRegion localRegion, boolean rememberRead) {
this.operationCount++;
return getRealDeal(keyInfo, localRegion).getValueInVM(keyInfo, localRegion, rememberRead);
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.InternalDataView#isDeferredStats()
- */
public boolean isDeferredStats() {
assertBootstrapped();
return getRealDeal(null, null).isDeferredStats();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.InternalDataView#putEntry(org.apache.geode.internal.cache.
- * EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean)
- */
public boolean putEntry(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) {
@@ -816,34 +571,14 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateInterface#isInProgressAndSameAs(org.apache.geode.
- * internal.cache.TXStateInterface)
- */
public boolean isInProgressAndSameAs(TXStateInterface otherState) {
return isInProgress() && otherState == this;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.TXStateProxy#setLocalTXState(org.apache.geode.internal.cache.
- * TXState)
- */
public void setLocalTXState(TXStateInterface state) {
this.realDeal = state;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#getSerializedValue(org.apache.geode.internal.
- * cache.LocalRegion, java.lang.Object, java.lang.Object)
- */
public Object getSerializedValue(LocalRegion localRegion, KeyInfo key, boolean doNotLockEntry,
ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent,
boolean returnTombstones) throws DataLocationException {
@@ -852,13 +587,6 @@ public class TXStateProxyImpl implements TXStateProxy {
requestingClient, clientEvent, returnTombstones);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#putEntryOnRemote(org.apache.geode.internal.
- * cache.EntryEventImpl, boolean, boolean, java.lang.Object, boolean, long, boolean)
- */
public boolean putEntryOnRemote(EntryEventImpl event, boolean ifNew, boolean ifOld,
Object expectedOldValue, boolean requireOldValue, long lastModified,
boolean overwriteDestroyed) throws DataLocationException {
@@ -873,12 +601,6 @@ public class TXStateProxyImpl implements TXStateProxy {
return getRealDeal(null, null).isFireCallbacks();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.InternalDataView#destroyOnRemote(java.lang.Integer,
- * org.apache.geode.internal.cache.EntryEventImpl, java.lang.Object)
- */
public void destroyOnRemote(EntryEventImpl event, boolean cacheWrite, Object expectedOldValue)
throws DataLocationException {
this.operationCount++;
@@ -887,13 +609,6 @@ public class TXStateProxyImpl implements TXStateProxy {
tx.destroyOnRemote(event, cacheWrite, expectedOldValue);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#invalidateOnRemote(org.apache.geode.internal.
- * cache.EntryEventImpl, boolean, boolean)
- */
public void invalidateOnRemote(EntryEventImpl event, boolean invokeCallbacks,
boolean forceNewEntry) throws DataLocationException {
this.operationCount++;
@@ -919,13 +634,6 @@ public class TXStateProxyImpl implements TXStateProxy {
LocalizedStrings.TXState_REGION_CLEAR_NOT_SUPPORTED_IN_A_TRANSACTION.toLocalizedString());
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#getBucketKeys(org.apache.geode.internal.cache.
- * LocalRegion, int)
- */
public Set getBucketKeys(LocalRegion localRegion, int bucketId, boolean allowTombstones) {
// if this the first operation in a transaction, reset txState
boolean resetTxState = this.realDeal == null;
@@ -945,12 +653,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.InternalDataView#getEntryOnRemote(java.lang.Object,
- * org.apache.geode.internal.cache.LocalRegion)
- */
public Entry getEntryOnRemote(KeyInfo keyInfo, LocalRegion localRegion, boolean allowTombstones)
throws DataLocationException {
this.operationCount++;
@@ -963,33 +665,15 @@ public class TXStateProxyImpl implements TXStateProxy {
getRealDeal(null, null);
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateProxy#getTarget()
- */
public DistributedMember getTarget() {
return this.target;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateProxy#setTarget(org.apache.geode.distributed.
- * DistributedMember)
- */
public void setTarget(DistributedMember target) {
assert this.target == null;
getRealDeal(target);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.geode.internal.cache.InternalDataView#getRegionKeysForIteration(org.apache.geode.
- * internal.cache.LocalRegion)
- */
public Collection<?> getRegionKeysForIteration(LocalRegion currRegion) {
if (currRegion.isUsedForPartitionedRegionBucket()) {
return currRegion.getRegionKeysForIteration();
@@ -998,20 +682,10 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateProxy#isCommitRequestedByOwner()
- */
public boolean isCommitOnBehalfOfRemoteStub() {
return this.commitRequestedByOwner;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.geode.internal.cache.TXStateProxy#setCommitRequestedByOwner()
- */
public boolean setCommitOnBehalfOfRemoteStub(boolean requestedByOwner) {
return this.commitRequestedByOwner = requestedByOwner;
}
@@ -1039,7 +713,8 @@ public class TXStateProxyImpl implements TXStateProxy {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("TXStateProxyImpl@").append(System.identityHashCode(this)).append(" txId:")
- .append(this.txId).append(" realDeal:" + this.realDeal).append(" isJTA:").append(isJTA);
+ .append(this.txId).append(" realDeal:").append(this.realDeal).append(" isJTA:")
+ .append(isJTA);
return builder.toString();
}
@@ -1051,7 +726,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
-
public boolean isMemberIdForwardingRequired() {
if (this.realDeal == null) {
return false;
@@ -1060,7 +734,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
-
public TXCommitMessage getCommitMessage() {
if (this.realDeal == null) {
return null;
@@ -1069,7 +742,6 @@ public class TXStateProxyImpl implements TXStateProxy {
}
}
-
public void postPutAll(DistributedPutAllOperation putallOp, VersionedObjectList successfulPuts,
LocalRegion region) {
if (putallOp.putAllData.length == 0) {
@@ -1159,7 +831,6 @@ public class TXStateProxyImpl implements TXStateProxy {
// Do nothing. Not applicable for transactions.
}
-
public void close() {
if (this.realDeal != null) {
this.realDeal.close();