You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/23 21:23:27 UTC
[31/94] [abbrv] [partial] incubator-geode git commit: Merge
remote-tracking branch 'origin/develop' into feature/GEODE-917
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
index 83b00b4,0000000..3d6ce73
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
@@@ -1,811 -1,0 +1,812 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.client.PoolFactory;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.Assert;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
+import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
+import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
+
+/**
+ * EventTracker tracks the last sequence number for a particular
+ * memberID:threadID. It is used to avoid replaying events in
+ * client/server and partitioned-region configurations.
+ *
+ * @author bruce
+ * @since 6.0
+ */
+public class EventTracker
+{
+ private static final Logger logger = LogService.getLogger();
+
+ /**
+ * a mapping of originator to the last event applied to this cache
+ *
+ * Keys are instances of {@link ThreadIdentifier}, values are instances
+ * of {@link com.gemstone.gemfire.internal.cache.EventTracker.EventSeqnoHolder}.
+ */
+ protected final ConcurrentMap<ThreadIdentifier, EventSeqnoHolder> recordedEvents
+ = new ConcurrentHashMap<ThreadIdentifier, EventSeqnoHolder>(100);
+
+ /** a mapping of originator to bulkOp's last status (true means
+ * finished processing) applied to this cache.
+ *
+ * Keys are instances of @link {@link ThreadIdentifier}, values are instances
+ * of {@link BulkOpProcessed}.
+ */
+ private final ConcurrentMap<ThreadIdentifier, BulkOpProcessed> recordedBulkOps
+ = new ConcurrentHashMap<ThreadIdentifier, BulkOpProcessed>(100);
+
+ /** a mapping of originator to bulkOperation's last version tags. This map
+ * differs from {@link #recordedBulkOps} in that the thread identifier used
+ * here is the base member id and thread id of the bulk op, as opposed to the fake
+ * thread id which is assigned for each bucket.
+ *
+ * recordedBulkOps are also only tracked on the secondary for partitioned regions
+ * recordedBulkOpVersionTags are tracked on both the primary and secondary.
+ *
+ * Keys are instances of @link {@link ThreadIdentifier}, values are instances
+ * of {@link BulkOpHolder}.
+ */
+ private final ConcurrentMap<ThreadIdentifier, BulkOpHolder> recordedBulkOpVersionTags
+ = new ConcurrentHashMap<ThreadIdentifier, BulkOpHolder>(100);
+
+ /**
+ * The member that the region corresponding to this tracker (if any)
+ * received its initial image from (if a replicate)
+ */
+ private volatile InternalDistributedMember initialImageProvider;
+
+
+ /**
+ * The cache associated with this tracker
+ */
+ GemFireCacheImpl cache;
+
+ /**
+ * The name of this tracker
+ */
+ String name;
+
+ /**
+ * whether or not this tracker has been initialized with state from
+ * another process
+ */
+ volatile boolean initialized;
+
+ /**
+ * object used to wait for initialization
+ */
+ final StoppableCountDownLatch initializationLatch;
+
+ /**
+ * Initialize the EventTracker's timer task. This is stored in the cache
+ * for tracking and shutdown purposes
+ * @param cache the cache to schedule tasks with
+ */
+ public static ExpiryTask startTrackerServices(GemFireCacheImpl cache) {
+ long expiryTime = Long.getLong("gemfire.messageTrackingTimeout",
+ PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3).longValue();
+ ExpiryTask result = new ExpiryTask(cache, expiryTime);
+ cache.getCCPTimer().scheduleAtFixedRate(result,
+ expiryTime, expiryTime);
+ //schedule(result, expiryTime);
+ return result;
+ }
+
+ /**
+ * Terminate the tracker's timer task
+ * @param cache the cache holding the tracker task
+ */
+ public static void stopTrackerServices(GemFireCacheImpl cache) {
+ cache.getEventTrackerTask().cancel();
+ }
+
+ /**
+ * Create an event tracker
+ * @param region the cache region to associate with this tracker
+ */
+ public EventTracker(LocalRegion region) {
+ this.cache = region.cache;
+ this.name = "Event Tracker for " + region.getName();
+ this.initializationLatch = new StoppableCountDownLatch(region.stopper, 1);
+ }
+
+ /** start this event tracker */
+ public void start() {
+ if (this.cache.getEventTrackerTask() != null) {
+ this.cache.getEventTrackerTask().addTracker(this);
+ }
+ }
+
+ /** stop this event tracker */
+ public void stop() {
+ if (this.cache.getEventTrackerTask() != null) {
+ this.cache.getEventTrackerTask().removeTracker(this);
+ }
+ }
+
+ /**
+ * retrieve a deep copy of the state of the event tracker. Synchronization
+ * is not used while copying the tracker's state.
+ */
+ public Map<ThreadIdentifier, EventSeqnoHolder> getState() {
+ Map<ThreadIdentifier, EventSeqnoHolder> result = new HashMap<ThreadIdentifier, EventSeqnoHolder>(recordedEvents.size());
+ for (Iterator<Map.Entry<ThreadIdentifier, EventSeqnoHolder>> it=recordedEvents.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<ThreadIdentifier, EventSeqnoHolder> entry = it.next();
+ EventSeqnoHolder holder = entry.getValue();
+ result.put(entry.getKey(), new EventSeqnoHolder(
+ holder.lastSeqno, null)); // don't transfer version tags - adds too much bulk just so we can do client tag recovery
+ }
+ return result;
+ }
+
+ /**
+ * record the given state in the tracker.
+ * @param provider the member that provided this state
+ * @param state a Map obtained from getState();
+ */
+ public void recordState(InternalDistributedMember provider, Map<ThreadIdentifier, EventSeqnoHolder> state) {
+ this.initialImageProvider = provider;
+ StringBuffer sb = null;
+ if (logger.isDebugEnabled()) {
+ sb = new StringBuffer(200);
+ sb.append("Recording initial state for ")
+ .append(this.name)
+ .append(": ");
+ }
+ for (Iterator<Map.Entry<ThreadIdentifier, EventSeqnoHolder>> it=state.entrySet().iterator(); it.hasNext(); ) {
+ Map.Entry<ThreadIdentifier, EventSeqnoHolder> entry = it.next();
+ if (sb != null) {
+ sb.append("\n ")
+ .append(entry.getKey().expensiveToString())
+ .append("; sequenceID=")
+ .append(entry.getValue());
+ }
+ // record only if we haven't received an event that is newer
+ recordSeqno(entry.getKey(), entry.getValue(), true);
+ }
+ if (sb != null) {
+ logger.debug(sb);
+ }
+ // fix for bug 41622 - hang in GII. This keeps ops from waiting for the
+ // full GII to complete
+ setInitialized();
+ }
+
+ /**
+ * Use this method to ensure that the tracker is put in an initialized state
+ */
+ public void setInitialized() {
+ this.initializationLatch.countDown();
+ this.initialized = true;
+ }
+
+ /**
+ * Wait for the tracker to finishe being initialized
+ */
+ public void waitOnInitialization() throws InterruptedException {
+ this.initializationLatch.await();
+ }
+
+ /**
+ * Record an event sequence id if it is higher than what we currently have.
+ * This is intended for use during initial image transfer.
+ * @param membershipID the key of an entry in the map obtained from getEventState()
+ * @param evhObj the value of an entry in the map obtained from getEventState()
+ */
+ protected void recordSeqno(ThreadIdentifier membershipID, EventSeqnoHolder evhObj){
+ recordSeqno(membershipID, evhObj, false);
+ }
+
+ /**
+ * Record an event sequence id if it is higher than what we currently have.
+ * This is intended for use during initial image transfer.
+ * @param threadID the key of an entry in the map obtained from getEventState()
+ * @param evh the value of an entry in the map obtained from getEventState()
+ * @param ifAbsent only record this state if there's not already an entry for this memberID
+ */
+ private void recordSeqno(ThreadIdentifier threadID, EventSeqnoHolder evh, boolean ifAbsent) {
+ boolean removed;
+ if (logger.isDebugEnabled()) {
+ logger.debug("recording {} {}", threadID.expensiveToString(), evh.toString());
+ }
+ do {
+ removed = false;
+ EventSeqnoHolder oldEvh = recordedEvents.putIfAbsent(
+ threadID, evh);
+ if (oldEvh != null) {
+ synchronized(oldEvh) {
+ if (oldEvh.removed) {
+ // need to wait for an entry being removed by the sweeper to go away
+ removed = true;
+ continue;
+ }
+ else {
+ if (ifAbsent) {
+ break;
+ }
+ oldEvh.endOfLifeTimer = 0;
+ if (oldEvh.lastSeqno < evh.lastSeqno) {
+ oldEvh.lastSeqno = evh.lastSeqno;
+ oldEvh.versionTag = evh.versionTag;
+// Exception e = oldEvh.context;
+// oldEvh.context = new Exception("stack trace");
+// oldEvh.context.initCause(e);
+ }
+ }
+ }
+ }
+ else {
+ evh.endOfLifeTimer = 0;
+// evh.context = new Exception("stack trace");
+ }
+ } while (removed);
+ }
+
+ /** record the event's threadid/sequenceid to prevent replay */
+ public void recordEvent(InternalCacheEvent event) {
+ EventID eventID = event.getEventId();
+ if (ignoreEvent(event, eventID)) {
+ return; // not tracked
+ }
+
+ LocalRegion lr = (LocalRegion)event.getRegion();
+
+ ThreadIdentifier membershipID = new ThreadIdentifier(eventID.getMembershipID(),
+ eventID.getThreadID());
+
+ VersionTag tag = null;
+ if (lr.getServerProxy() == null/* && event.hasClientOrigin()*/) { // clients do not need to store version tags for replayed events
+ tag = event.getVersionTag();
+ RegionVersionVector v = ((LocalRegion)event.getRegion()).getVersionVector();
+ // bug #46453 - make sure ID references are canonical before storing
+ if (v != null && tag != null) {
+ tag.setMemberID(v.getCanonicalId(tag.getMemberID()));
+ if (tag.getPreviousMemberID() != null) {
+ tag.setPreviousMemberID(v.getCanonicalId(tag.getPreviousMemberID()));
+ }
+ }
+ }
- EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag);
- if (logger.isTraceEnabled()){
- logger.trace("region event tracker recording {}", event);
- }
- recordSeqno(membershipID, newEvh);
+
+ //If this is a bulkOp, and concurrency checks are enabled, we need to
+ //save the version tag in case we retry.
+ if (lr.concurrencyChecksEnabled
+ && (event.getOperation().isPutAll() || event.getOperation().isRemoveAll()) && lr.getServerProxy() == null) {
+ recordBulkOpEvent(event, membershipID);
+ }
++
++ EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag);
++ if (logger.isTraceEnabled()){
++ logger.trace("region event tracker recording {}", event);
++ }
++ recordSeqno(membershipID, newEvh);
+ }
+
+ /**
+ * Record a version tag for a bulk operation
+ */
+ private void recordBulkOpEvent(InternalCacheEvent event, ThreadIdentifier tid) {
+ EventID eventID = event.getEventId();
+
+ VersionTag tag = event.getVersionTag();
+ if (tag == null) {
+ return;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("recording bulkOp event {} {} {} op={}", tid.expensiveToString(),
+ eventID, tag, event.getOperation());
+ }
+
+ RegionVersionVector v = ((LocalRegion)event.getRegion()).getVersionVector();
+ // bug #46453 - make sure ID references are canonical before storing
+ if (v != null) {
+ tag.setMemberID(v.getCanonicalId(tag.getMemberID()));
+ if (tag.getPreviousMemberID() != null) {
+ tag.setPreviousMemberID(v.getCanonicalId(tag.getPreviousMemberID()));
+ }
+ }
+
+ //Loop until we can successfully update the recorded bulk operations
+ //For this thread id.
+ boolean retry = false;
+ do {
+ BulkOpHolder bulkOpTracker = recordedBulkOpVersionTags.get(tid);
+ if(bulkOpTracker == null) {
+ bulkOpTracker = new BulkOpHolder();
+ BulkOpHolder old = recordedBulkOpVersionTags.putIfAbsent(tid, bulkOpTracker);
+ if(old != null) {
+ retry = true;
+ continue;
+ }
+ }
+ synchronized(bulkOpTracker) {
+ if(bulkOpTracker.removed) {
+ retry = true;
+ continue;
+ }
+
+ //Add the version tag for bulkOp event.
+ bulkOpTracker.putVersionTag(eventID, event.getVersionTag());
+ retry = false;
+ }
+ } while(retry);
+ }
+
+ public boolean hasSeenEvent(InternalCacheEvent event) {
+// ClientProxyMembershipID membershipID = event.getContext();
+ EventID eventID = event.getEventId();
+ if (ignoreEvent(event, eventID)) {
+ return false; // not tracked
+ }
+ return hasSeenEvent(eventID, event);
+ }
+
+ public boolean hasSeenEvent(EventID eventID) {
+ return hasSeenEvent(eventID, null);
+ }
+
+ public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) {
+ ThreadIdentifier membershipID = new ThreadIdentifier(
+ eventID.getMembershipID(), eventID.getThreadID());
+// if (membershipID == null || eventID == null) {
+// return false;
+// }
+
+ EventSeqnoHolder evh = recordedEvents.get(membershipID);
+ if (evh == null) {
+ return false;
+ }
+
+ synchronized (evh) {
+ if (evh.removed || evh.lastSeqno < eventID.getSequenceID()) {
+ return false;
+ }
+ // log at fine because partitioned regions can send event multiple times
+ // during normal operation during bucket region initialization
+ if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)) {
+ logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, "Cache encountered replay of event with ID {}. Highest recorded for this source is {}",
+ eventID, evh.lastSeqno);
+ }
+ // bug #44956 - recover version tag for duplicate event
+ if (evh.lastSeqno == eventID.getSequenceID() && tagHolder != null && evh.versionTag != null) {
+ ((EntryEventImpl)tagHolder).setVersionTag(evh.versionTag);
+ }
+ return true;
+ } // synchronized
+ }
+
+ public VersionTag findVersionTag(EventID eventID) {
+ ThreadIdentifier threadID = new ThreadIdentifier(
+ eventID.getMembershipID(), eventID.getThreadID());
+
+ EventSeqnoHolder evh = recordedEvents.get(threadID);
+ if (evh == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("search for version tag failed as no event is recorded for {}", threadID.expensiveToString());
+ }
+ return null;
+ }
+
+ synchronized (evh) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("search for version tag located last event for {}: {}",threadID.expensiveToString(), evh);
+ }
+ if (evh.lastSeqno != eventID.getSequenceID()) {
+ return null;
+ }
+ // log at fine because partitioned regions can send event multiple times
+ // during normal operation during bucket region initialization
+ if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && evh.versionTag == null) {
+ logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, "Could not recover version tag. Found event holder with no version tag for {}", eventID);
+ }
+ return evh.versionTag;
+ } // synchronized
+ }
+
+ public VersionTag findVersionTagForGateway(EventID eventID) {
+ ThreadIdentifier threadID = new ThreadIdentifier(
+ eventID.getMembershipID(), eventID.getThreadID());
+
+ EventSeqnoHolder evh = recordedEvents.get(threadID);
+ if (evh == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("search for version tag failed as no event is recorded for {}", threadID.expensiveToString());
+ }
+ return null;
+ }
+
+ synchronized (evh) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("search for version tag located last event for {}: {} {}",threadID.expensiveToString(), evh, eventID.getSequenceID() );
+ }
+
+ if (evh.lastSeqno < eventID.getSequenceID()) {
+ return null;
+ }
+ // log at fine because partitioned regions can send event multiple times
+ // during normal operation during bucket region initialization
+ if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && evh.versionTag == null) {
+ logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, "Could not recover version tag. Found event holder with no version tag for {}", eventID);
+ }
+ return evh.versionTag;
+ } // synchronized
+ }
+
+
+ public VersionTag findVersionTagForBulkOp(EventID eventID) {
+ ThreadIdentifier threadID = new ThreadIdentifier(
+ eventID.getMembershipID(), eventID.getThreadID());
+
+ BulkOpHolder evh = recordedBulkOpVersionTags.get(threadID);
+ if (evh == null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("search for version tag failed as no events are recorded for {}", threadID.expensiveToString());
+ }
+ return null;
+ }
+
+ synchronized (evh) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("search for version tag located event holder for {}: {}", threadID.expensiveToString(), evh);
+ }
+ return evh.entryVersionTags.get(eventID);
+ } // synchronized
+ }
+
+ /**
+ * @param event
+ * @param eventID
+ * @return true if the event should not be tracked, false otherwise
+ */
+ private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) {
+ if (eventID == null) {
+ return true;
+ } else {
+ boolean isVersioned = (event.getVersionTag() != null);
+ boolean isClient = event.hasClientOrigin();
+ if (isVersioned && isClient) {
+ return false; // version tags for client events are kept for retries by the client
+ }
+ boolean isEntry = event.getOperation().isEntry();
+ boolean isPr = event.getRegion().getAttributes().getDataPolicy().withPartitioning()
+ || ((LocalRegion)event.getRegion()).isUsedForPartitionedRegionBucket();
+ return (!isClient && // ignore if it originated on a server, and
+ isEntry && // it affects an entry and
+ !isPr); // is not on a PR
+ }
+ }
+
+ /**
+ * A routine to provide synchronization running based on <memberShipID, threadID>
+ * of the requesting client
+ * @param r - a Runnable to wrap the processing of the bulk op
+ * @param eventID - the base event ID of the bulk op
+ *
+ * @since 5.7
+ */
+ public void syncBulkOp(Runnable r, EventID eventID) {
+ Assert.assertTrue(eventID != null);
+ ThreadIdentifier membershipID = new ThreadIdentifier(
+ eventID.getMembershipID(), eventID.getThreadID());
+
+ BulkOpProcessed opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new BulkOpProcessed(false));
+ if (opSyncObj == null) {
+ opSyncObj = recordedBulkOps.get(membershipID);
+ }
+ synchronized (opSyncObj) {
+ try {
+ if (opSyncObj.getStatus() && logger.isDebugEnabled()) {
+ logger.debug("SyncBulkOp: The operation was performed by another thread.");
+ }
+ else {
+ recordBulkOpStart(membershipID);
+
+ //Perform the bulk op
+ r.run();
+ // set to true in case another thread is waiting at sync
+ opSyncObj.setStatus(true);
+ recordedBulkOps.remove(membershipID);
+ }
+ }
+ finally {
+ recordedBulkOps.remove(membershipID);
+ }
+ }
+ }
+
+ /**
+ * Called when a bulkOp is started on the local region. Used to clear
+ * event tracker state from the last bulkOp.
+ */
+ public void recordBulkOpStart(ThreadIdentifier tid) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("recording bulkOp start for {}", tid.expensiveToString());
+ }
+ this.recordedBulkOpVersionTags.remove(tid);
+ }
+
+ /**
+ * @return the initialized
+ */
+ public boolean isInitialized() {
+ return this.initialized;
+ }
+
+ /**
+ * @param mbr the member in question
+ * @return true if the given member provided the initial image event state for this tracker
+ */
+ public boolean isInitialImageProvider(DistributedMember mbr) {
+ return (this.initialImageProvider != null)
+ && (mbr != null)
+ && this.initialImageProvider.equals(mbr);
+ }
+
+ /**
+ * Test method for getting the set of recorded version tags.
+ */
+ protected ConcurrentMap<ThreadIdentifier, BulkOpHolder> getRecordedBulkOpVersionTags() {
+ return recordedBulkOpVersionTags;
+ }
+
+ @Override
+ public String toString() {
+ return ""+this.name+"(initialized=" + this.initialized+")";
+ }
+
+ /**
+ * A sequence number tracker to keep events from clients from being
+ * re-applied to the cache if they've already been seen.
+ * @author bruce
+ * @since 5.5
+ */
+ static class EventSeqnoHolder implements DataSerializable {
+ private static final long serialVersionUID = 8137262960763308046L;
+
+ /** event sequence number. These */
+ long lastSeqno = -1;
+
+ /** millisecond timestamp */
+ transient long endOfLifeTimer;
+
+ /** whether this entry is being removed */
+ transient boolean removed;
+
+ /**
+ * version tag, if any, for the operation
+ */
+ VersionTag versionTag;
+
+ // for debugging
+// transient Exception context;
+
+ EventSeqnoHolder(long id, VersionTag versionTag) {
+ this.lastSeqno = id;
+ this.versionTag = versionTag;
+ }
+
+ public EventSeqnoHolder() {
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder result = new StringBuilder();
+ result.append("seqNo").append(this.lastSeqno);
+ if (this.versionTag != null) {
+ result.append(",").append(this.versionTag);
+ }
+ return result.toString();
+ }
+
+ public void fromData(DataInput in) throws IOException,
+ ClassNotFoundException {
+ lastSeqno = in.readLong();
+ versionTag = (VersionTag)DataSerializer.readObject(in);
+ }
+
+ public void toData(DataOutput out) throws IOException {
+ out.writeLong(lastSeqno);
+ DataSerializer.writeObject(versionTag, out);
+ }
+ }
+
+ /**
+ * A status tracker for each bulk operation (putAll or removeAll) from originators specified by
+ * membershipID and threadID in the cache
+ * processed is true means the bulk op is processed by one thread
+ * no need to redo it by other threads.
+ * @author Gester
+ * @since 5.7
+ */
+ static class BulkOpProcessed {
+ /** whether the op is processed */
+ private boolean processed;
+
+ /**
+ * creates a new instance to save status of a bulk op
+ * @param status true if the op has been processed
+ */
+ BulkOpProcessed(boolean status) {
+ this.processed = status;
+ }
+
+ /**
+ * setter method to change the status
+ * @param status true if the op has been processed
+ */
+ void setStatus(boolean status) {
+ this.processed = status;
+ }
+
+ /**
+ * getter method to peek the current status
+ * @return current status
+ */
+ boolean getStatus() {
+ return this.processed;
+ }
+
+ @Override
+ public String toString() {
+ return "BULKOP("+this.processed+")";
+ }
+ }
+
+ /**
+ * A holder for the version tags generated for a bulk operation (putAll or removeAll). These
+ * version tags are retrieved when a bulk op is retried.
+ * @author Dan
+ * @since 7.0
+ * protected for test purposes only.
+ */
+ protected static class BulkOpHolder {
+ /**
+ * Whether this object was removed by the cleanup thread.
+ */
+ public boolean removed;
+ /**
+ * public for tests only
+ */
+ public Map<EventID, VersionTag> entryVersionTags = new HashMap<EventID, VersionTag>();
+ /** millisecond timestamp */
+ transient long endOfLifeTimer;
+
+ /**
+ * creates a new instance to save status of a putAllOperation
+ */
+ BulkOpHolder() {
+ }
+
+ public void putVersionTag(EventID eventId, VersionTag versionTag) {
+ entryVersionTags.put(eventId, versionTag);
+ this.endOfLifeTimer = 0;
+ }
+
+
+ @Override
+ public String toString() {
+ return "BulkOpHolder tags=" + this.entryVersionTags;
+ }
+ }
+
+ static class ExpiryTask extends SystemTimerTask {
+
+ GemFireCacheImpl cache;
+ long expiryTime;
+ List trackers = new LinkedList();
+
+ public ExpiryTask(GemFireCacheImpl cache, long expiryTime) {
+ this.cache = cache;
+ this.expiryTime = expiryTime;
+ }
+ void addTracker(EventTracker tracker) {
+ synchronized(trackers) {
+ trackers.add(tracker);
+ }
+ }
+ void removeTracker(EventTracker tracker) {
+ synchronized(trackers) {
+ trackers.remove(tracker);
+ }
+ }
+ int getNumberOfTrackers() {
+ return trackers.size();
+ }
+ @Override
+ public void run2() {
+ long now = System.currentTimeMillis();
+ long timeout = now - expiryTime;
+ final boolean traceEnabled = logger.isTraceEnabled();
+ synchronized(trackers) {
+ for (Iterator it=trackers.iterator(); it.hasNext(); ) {
+ EventTracker tracker = (EventTracker)it.next();
+ if (traceEnabled) {
+ logger.trace("{} sweeper: starting", tracker.name);
+ }
+ for (Iterator it2 = tracker.recordedEvents.entrySet().iterator(); it2.hasNext();) {
+ Map.Entry e = (Map.Entry)it2.next();
+ EventSeqnoHolder evh = (EventSeqnoHolder)e.getValue();
+ synchronized(evh) {
+ if (evh.endOfLifeTimer == 0) {
+ evh.endOfLifeTimer = now; // a new holder - start the timer
+ }
+ if (evh.endOfLifeTimer <= timeout) {
+ evh.removed = true;
+ evh.lastSeqno = -1;
+ if (traceEnabled) {
+ logger.trace("{} sweeper: removing {}", tracker.name, e.getKey());
+ }
+ it2.remove();
+ }
+ }
+ }
+
+ //Remove bulk operations we're tracking
+ for (Iterator<Map.Entry<ThreadIdentifier, BulkOpHolder>> it2 = tracker.recordedBulkOpVersionTags.entrySet().iterator(); it2.hasNext();) {
+ Map.Entry<ThreadIdentifier, BulkOpHolder> e = it2.next();
+ BulkOpHolder evh = e.getValue();
+ synchronized(evh) {
+ if (evh.endOfLifeTimer == 0) {
+ evh.endOfLifeTimer = now; // a new holder - start the timer
+ }
+ //Remove the PutAll tracker only if the put all is complete
+ //and it has expired.
+ if (evh.endOfLifeTimer <= timeout) {
+ evh.removed = true;
+ if (logger.isTraceEnabled()) {
+ logger.trace("{} sweeper: removing bulkOp {}", tracker.name, e.getKey());
+ }
+ it2.remove();
+ }
+ }
+ }
+ if (traceEnabled) {
+ logger.trace("{} sweeper: done", tracker.name);
+ }
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 201acc0,0000000..201acc0
mode 100644,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index f4216ac,0000000..de49fea
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@@ -1,1511 -1,0 +1,1511 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.GemFireException;
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheTransactionManager;
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
+import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.cache.TransactionInDoubtException;
+import com.gemstone.gemfire.cache.TransactionListener;
+import com.gemstone.gemfire.cache.TransactionWriter;
+import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
+import com.gemstone.gemfire.distributed.TXManagerCancelledException;
+import com.gemstone.gemfire.distributed.internal.DM;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.distributed.internal.MembershipListener;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
+import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
+import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
+import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap.MapCallback;
+
+/** <p>The internal implementation of the {@link CacheTransactionManager}
+ * interface returned by {@link GemFireCacheImpl#getCacheTransactionManager}.
+ * Internal operations
+
+ </code>TransactionListener</code> invocation, Region synchronization, transaction statistics and
+
+ * transaction logging are handled here
+ *
+ * @author Mitch Thomas
+ *
+ * @since 4.0
+ *
+ * @see CacheTransactionManager
+ */
+public final class TXManagerImpl implements CacheTransactionManager,
+ MembershipListener {
+
+ private static final Logger logger = LogService.getLogger();
+
+ // Thread specific context container
+ private final ThreadLocal<TXStateProxy> txContext;
+ private static TXManagerImpl currentInstance = null;
+ // The unique transaction ID for this Manager
+ private final AtomicInteger uniqId;
+
+ private final DM dm;
+ private final Cache cache;
+
+ // The DistributionMemberID used to construct TXId's
+ private final InternalDistributedMember distributionMgrId;
+
+ private final CachePerfStats cachePerfStats;
+
+ private static final TransactionListener[] EMPTY_LISTENERS =
+ new TransactionListener[0];
+
+ /**
+ * Default transaction id to indicate no transaction
+ */
+ public static final int NOTX = -1;
+
+ private final ArrayList<TransactionListener> txListeners = new ArrayList<TransactionListener>(8);
+ public TransactionWriter writer = null;
+ private boolean closed = false;
+
+ private final Map<TXId, TXStateProxy> hostedTXStates;
+
+ /**
+ * the number of client initiated transactions to store for client failover
+ */
+ public final static int FAILOVER_TX_MAP_SIZE = Integer.getInteger("gemfire.transactionFailoverMapSize", 1000);
+
+ /**
+ * used to store TXCommitMessages for client initiated transactions, so that when a client failsover,
+ * (after the delegate dies) the commit message can be sent to client.
+ * //TODO we really need to keep around only one msg for each thread on a client
+ */
+ @SuppressWarnings("unchecked")
+ private Map<TXId ,TXCommitMessage> failoverMap = Collections.synchronizedMap(new LinkedHashMap<TXId, TXCommitMessage>() {
+ private static final long serialVersionUID = -4156018226167594134L;
+
+ protected boolean removeEldestEntry(Entry eldest) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("TX: removing client initiated transaction from failover map:{} :{}", eldest.getKey(), (size()>FAILOVER_TX_MAP_SIZE));
+ }
+ return size() > FAILOVER_TX_MAP_SIZE;
+ };
+ });
+
+ /**
+ * A flag to allow persistent transactions. public for testing.
+ */
+ public static boolean ALLOW_PERSISTENT_TRANSACTIONS = Boolean.getBoolean("gemfire.ALLOW_PERSISTENT_TRANSACTIONS");
+
+ /**
+ * this keeps track of all the transactions that were initiated locally.
+ */
+ private ConcurrentMap<TXId, TXStateProxy> localTxMap = new ConcurrentHashMap<TXId, TXStateProxy>();
+
+ /**
+ * the time in minutes after which any suspended transaction are rolled back. default is 30 minutes
+ */
+ private volatile long suspendedTXTimeout = Long.getLong("gemfire.suspendedTxTimeout", 30);
+
+ /**
+ * Thread-specific flag to indicate whether the transactions managed by this
+ * CacheTransactionManager for this thread should be distributed
+ */
+ private final ThreadLocal<Boolean> isTXDistributed;
+
+
+ /** Constructor that implements the {@link CacheTransactionManager}
+ * interface. Only only one instance per {@link com.gemstone.gemfire.cache.Cache}
+ *
+ * @param cachePerfStats
+ */
+ public TXManagerImpl(
+ CachePerfStats cachePerfStats,
+ Cache cache) {
+ this.cache = cache;
+ this.dm = ((InternalDistributedSystem)cache.getDistributedSystem())
+ .getDistributionManager();
+ this.distributionMgrId = this.dm.getDistributionManagerId();
+ this.uniqId = new AtomicInteger(0);
+ this.cachePerfStats = cachePerfStats;
+ this.hostedTXStates = new HashMap<TXId, TXStateProxy>();
+ this.txContext = new ThreadLocal<TXStateProxy>();
+ this.isTXDistributed = new ThreadLocal<Boolean>();
+ currentInstance = this;
+ }
+
+ final Cache getCache() {
+ return this.cache;
+ }
+
+
+ /**
+ * Get the TransactionWriter for the cache
+ *
+ * @return the current TransactionWriter
+ * @see TransactionWriter
+ */
+ public final TransactionWriter getWriter() {
+ return writer;
+ }
+
+
+ public final void setWriter(TransactionWriter writer) {
+ if (((GemFireCacheImpl)this.cache).isClient()) {
+ throw new IllegalStateException(LocalizedStrings.TXManager_NO_WRITER_ON_CLIENT.toLocalizedString());
+ }
+ this.writer = writer;
+ }
+
+
+ public final TransactionListener getListener() {
+ synchronized (this.txListeners) {
+ if (this.txListeners.isEmpty()) {
+ return null;
+ } else if (this.txListeners.size() == 1) {
+ return this.txListeners.get(0);
+ } else {
+ throw new IllegalStateException(LocalizedStrings.TXManagerImpl_MORE_THAN_ONE_TRANSACTION_LISTENER_EXISTS.toLocalizedString());
+ }
+ }
+ }
+
+ public TransactionListener[] getListeners() {
+ synchronized (this.txListeners) {
+ int size = this.txListeners.size();
+ if (size == 0) {
+ return EMPTY_LISTENERS;
+ } else {
+ TransactionListener[] result = new TransactionListener[size];
+ this.txListeners.toArray(result);
+ return result;
+ }
+ }
+ }
+
+ public TransactionListener setListener(TransactionListener newListener) {
+ synchronized (this.txListeners) {
+ TransactionListener result = getListener();
+ this.txListeners.clear();
+ if (newListener != null) {
+ this.txListeners.add(newListener);
+ }
+ if (result != null) {
+ closeListener(result);
+ }
+ return result;
+ }
+ }
+ public void addListener(TransactionListener aListener) {
+ if (aListener == null) {
+ throw new IllegalArgumentException(LocalizedStrings.TXManagerImpl_ADDLISTENER_PARAMETER_WAS_NULL.toLocalizedString());
+ }
+ synchronized (this.txListeners) {
+ if (!this.txListeners.contains(aListener)) {
+ this.txListeners.add(aListener);
+ }
+ }
+ }
+ public void removeListener(TransactionListener aListener) {
+ if (aListener == null) {
+ throw new IllegalArgumentException(LocalizedStrings.TXManagerImpl_REMOVELISTENER_PARAMETER_WAS_NULL.toLocalizedString());
+ }
+ synchronized (this.txListeners) {
+ if (this.txListeners.remove(aListener)) {
+ closeListener(aListener);
+ }
+ }
+ }
+ public void initListeners(TransactionListener[] newListeners) {
+ synchronized (this.txListeners) {
+ if (!this.txListeners.isEmpty()) {
+ Iterator<TransactionListener> it = this.txListeners.iterator();
+ while (it.hasNext()) {
+ closeListener(it.next());
+ }
+ this.txListeners.clear();
+ }
+ if (newListeners != null && newListeners.length > 0) {
+ List<TransactionListener> nl = Arrays.asList(newListeners);
+ if (nl.contains(null)) {
+ throw new IllegalArgumentException(LocalizedStrings.TXManagerImpl_INITLISTENERS_PARAMETER_HAD_A_NULL_ELEMENT.toLocalizedString());
+ }
+ this.txListeners.addAll(nl);
+ }
+ }
+ }
+
+ final CachePerfStats getCachePerfStats() {
+ return this.cachePerfStats;
+ }
+
+ /** Build a new {@link TXId}, use it as part of the transaction
+ * state and associate with the current thread using a {@link
+ * ThreadLocal}.
+ */
+ public void begin() {
+ checkClosed();
+ {
+ TransactionId tid = getTransactionId();
+ if (tid != null) {
+ throw new java.lang.IllegalStateException(LocalizedStrings.TXManagerImpl_TRANSACTION_0_ALREADY_IN_PROGRESS.toLocalizedString(tid));
+ }
+ }
+ TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
+ TXStateProxyImpl proxy = null;
+ if (isDistributed()) {
+ proxy = new DistTXStateProxyImplOnCoordinator(this, id, null);
+ } else {
+ proxy = new TXStateProxyImpl(this, id, null);
+ }
+ setTXState(proxy);
+ this.localTxMap.put(id, proxy);
+ }
+
+
+ /** Build a new {@link TXId}, use it as part of the transaction
+ * state and associate with the current thread using a {@link
+ * ThreadLocal}. Flag the transaction to be enlisted with a JTA
+ * Transaction. Should only be called in a context where we know
+ * there is no existing transaction.
+ */
+ public TXStateProxy beginJTA() {
+ checkClosed();
+ TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
+ TXStateProxy newState = null;
+
+ if (isDistributed()) {
+ newState = new DistTXStateProxyImplOnCoordinator(this, id, true);
+ } else {
+ newState = new TXStateProxyImpl(this, id, true);
+ }
+ setTXState(newState);
+ return newState;
+ }
+
+ /*
+ * Only applicable for Distributed transaction.
+ */
+ public void precommit() throws CommitConflictException {
+ checkClosed();
+
+ final TXStateProxy tx = getTXState();
+ if (tx == null) {
+ throw new IllegalStateException(LocalizedStrings.TXManagerImpl_THREAD_DOES_NOT_HAVE_AN_ACTIVE_TRANSACTION.toLocalizedString());
+ }
+
+ tx.checkJTA(LocalizedStrings.TXManagerImpl_CAN_NOT_COMMIT_THIS_TRANSACTION_BECAUSE_IT_IS_ENLISTED_WITH_A_JTA_TRANSACTION_USE_THE_JTA_MANAGER_TO_PERFORM_THE_COMMIT.toLocalizedString());
+
+ tx.precommit();
+ }
+
+ /** Complete the transaction associated with the current
+ * thread. When this method completes, the thread is no longer
+ * associated with a transaction.
+ *
+ */
+ public void commit() throws CommitConflictException {
+ checkClosed();
+
+ final TXStateProxy tx = getTXState();
+ if (tx == null) {
+ throw new IllegalStateException(LocalizedStrings.TXManagerImpl_THREAD_DOES_NOT_HAVE_AN_ACTIVE_TRANSACTION.toLocalizedString());
+ }
+
+ tx.checkJTA(LocalizedStrings.TXManagerImpl_CAN_NOT_COMMIT_THIS_TRANSACTION_BECAUSE_IT_IS_ENLISTED_WITH_A_JTA_TRANSACTION_USE_THE_JTA_MANAGER_TO_PERFORM_THE_COMMIT.toLocalizedString());
+
+ final long opStart = CachePerfStats.getStatTime();
+ final long lifeTime = opStart - tx.getBeginTime();
+ try {
+ setTXState(null);
+ tx.commit();
+ } catch (CommitConflictException ex) {
+ saveTXStateForClientFailover(tx, TXCommitMessage.CMT_CONFLICT_MSG); //fixes #43350
+ noteCommitFailure(opStart, lifeTime, tx);
+ cleanup(tx.getTransactionId()); // fixes #52086
+ throw ex;
+ } catch (TransactionDataRebalancedException reb) {
+ saveTXStateForClientFailover(tx, TXCommitMessage.REBALANCE_MSG);
+ cleanup(tx.getTransactionId()); // fixes #52086
+ throw reb;
+ } catch (UnsupportedOperationInTransactionException e) {
+ // fix for #42490
+ setTXState(tx);
+ throw e;
+ } catch (RuntimeException e) {
+ saveTXStateForClientFailover(tx, TXCommitMessage.EXCEPTION_MSG);
+ cleanup(tx.getTransactionId()); // fixes #52086
+ throw e;
+ }
+ saveTXStateForClientFailover(tx);
+ cleanup(tx.getTransactionId());
+ noteCommitSuccess(opStart, lifeTime, tx);
+ }
+
+ final void noteCommitFailure(long opStart, long lifeTime, TXStateInterface tx) {
+ long opEnd = CachePerfStats.getStatTime();
+ this.cachePerfStats.txFailure(opEnd - opStart,
+ lifeTime, tx.getChanges());
+ TransactionListener[] listeners = getListeners();
+ if (tx.isFireCallbacks() && listeners.length > 0) {
+ final TXEvent e = tx.getEvent();
+ try {
+ for (int i=0; i < listeners.length; i++) {
+ try {
+ listeners[i].afterFailedCommit(e);
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
+ }
+ }
+ } finally {
+ e.release();
+ }
+ }
+ }
+
+ final void noteCommitSuccess(long opStart, long lifeTime, TXStateInterface tx) {
+ long opEnd = CachePerfStats.getStatTime();
+ this.cachePerfStats.txSuccess(opEnd - opStart,
+ lifeTime, tx.getChanges());
+ TransactionListener[] listeners = getListeners();
+ if (tx.isFireCallbacks() && listeners.length > 0) {
+ final TXEvent e = tx.getEvent();
+ try {
+ for (final TransactionListener listener : listeners) {
+ try {
+ listener.afterCommit(e);
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
+ }
+ }
+ } finally {
+ e.release();
+ }
+ }
+ }
+
+ /**
+ * prepare for transaction replay by assigning a new tx id to the current proxy
+ */
+ private void _incrementTXUniqueIDForReplay() {
+ TXStateProxyImpl tx = (TXStateProxyImpl)getTXState();
+ assert tx != null : "expected a transaction to be in progress";
+ TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
+ tx.setTXIDForReplay(id);
+ }
+
+
+ /** Roll back the transaction associated with the current
+ * thread. When this method completes, the thread is no longer
+ * associated with a transaction.
+ */
+ public void rollback() {
+ checkClosed();
+ TXStateProxy tx = getTXState();
+ if (tx == null) {
+ throw new IllegalStateException(LocalizedStrings.TXManagerImpl_THREAD_DOES_NOT_HAVE_AN_ACTIVE_TRANSACTION.toLocalizedString());
+ }
+
+ tx.checkJTA(LocalizedStrings.TXManagerImpl_CAN_NOT_ROLLBACK_THIS_TRANSACTION_IS_ENLISTED_WITH_A_JTA_TRANSACTION_USE_THE_JTA_MANAGER_TO_PERFORM_THE_ROLLBACK.toLocalizedString());
+
+ final long opStart = CachePerfStats.getStatTime();
+ final long lifeTime = opStart - tx.getBeginTime();
+ setTXState(null);
+ tx.rollback();
+ saveTXStateForClientFailover(tx);
+ cleanup(tx.getTransactionId());
+ noteRollbackSuccess(opStart, lifeTime, tx);
+ }
+
+ final void noteRollbackSuccess(long opStart, long lifeTime, TXStateInterface tx) {
+ long opEnd = CachePerfStats.getStatTime();
+ this.cachePerfStats.txRollback(opEnd - opStart,
+ lifeTime, tx.getChanges());
+ TransactionListener[] listeners = getListeners();
+ if (tx.isFireCallbacks() && listeners.length > 0) {
+ final TXEvent e = tx.getEvent();
+ try {
+ for (int i = 0; i < listeners.length; i++) {
+ try {
+ listeners[i].afterRollback(e);
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
+ }
+ }
+ } finally {
+ e.release();
+ }
+ }
+ }
+
+ /**
+ * Called from Commit and Rollback to unblock waiting threads
+ */
+ private void cleanup(TransactionId txId) {
+ TXStateProxy proxy = this.localTxMap.remove(txId);
+ if (proxy != null) {
+ proxy.close();
+ }
+ Queue<Thread> waitingThreads = this.waitMap.get(txId);
+ if (waitingThreads != null && !waitingThreads.isEmpty()) {
+ for (Thread waitingThread : waitingThreads) {
+ LockSupport.unpark(waitingThread);
+ }
+ waitMap.remove(txId);
+ }
+ }
+
+ /** Reports the existance of a Transaction for this thread
+ *
+ */
+ public boolean exists() {
+ return null != getTXState();
+ }
+
+ /** Gets the current transaction identifier or null if no transaction exists
+ *
+ */
+ public TransactionId getTransactionId() {
+ TXStateProxy t = getTXState();
+ TransactionId ret = null;
+ if (t!=null) {
+ ret = t.getTransactionId();
+ }
+ return ret;
+ }
+
+ /**
+ * Returns the TXStateProxyInterface of the current thread; null if no transaction.
+ */
+ public final TXStateProxy getTXState() {
+ TXStateProxy tsp = txContext.get();
+ if (tsp != null && !tsp.isInProgress()) {
+ this.txContext.set(null);
+ tsp = null;
+ }
+ return tsp;
+ }
+
+ /**
+ * sets {@link TXStateProxy#setInProgress(boolean)} when a txContext is present.
+ * This method must only be used in fail-over scenarios.
+ * @param progress value of the progress flag to be set
+ * @return the previous value of inProgress flag
+ * @see TXStateProxy#setInProgress(boolean)
+ */
+ public boolean setInProgress(boolean progress) {
+ boolean retVal = false;
+ TXStateProxy tsp = txContext.get();
+ if (tsp != null) {
+ retVal = tsp.isInProgress();
+ tsp.setInProgress(progress);
+ }
+ return retVal;
+ }
+
+ public final void setTXState(TXStateProxy val) {
+ txContext.set(val);
+ }
+
+
+ public void close() {
+ if (isClosed()) {
+ return;
+ }
+ this.closed = true;
+ for (TXStateProxy proxy: this.hostedTXStates.values()) {
+ proxy.close();
+ }
+ for (TXStateProxy proxy: this.localTxMap.values()) {
+ proxy.close();
+ }
+ {
+ TransactionListener[] listeners = getListeners();
+ for (int i=0; i < listeners.length; i++) {
+ closeListener(listeners[i]);
+ }
+ }
+ }
+ private void closeListener(TransactionListener tl) {
+ try {
+ tl.close();
+ }
+ catch (VirtualMachineError err) {
+ SystemFailure.initiateFailure(err);
+ // If this ever returns, rethrow the error. We're poisoned
+ // now, so don't let this thread continue.
+ throw err;
+ }
+ catch (Throwable t) {
+ // Whenever you catch Error or Throwable, you must also
+ // catch VirtualMachineError (see above). However, there is
+ // _still_ a possibility that you are dealing with a cascading
+ // error condition, so you also need to check to see if the JVM
+ // is still usable:
+ SystemFailure.checkFailure();
+ logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
+ }
+ }
+ /**
+ * If the current thread is in a transaction then suspend will
+ * cause it to no longer be in a transaction.
+ * @return the state of the transaction or null. Pass this value
+ * to {@link TXManagerImpl#resume} to reactivate the suspended transaction.
+ */
+ public final TXStateProxy internalSuspend() {
+ TXStateProxy result = getTXState();
+ if (result != null) {
+ result.suspend();
+ setTXState(null);
+ }
+ return result;
+ }
+ /**
+ * Activates the specified transaction on the calling thread.
+ * @param tx the transaction to activate.
+ * @throws IllegalStateException if this thread already has an active transaction
+ */
+ public final void resume(TXStateProxy tx) {
+ if (tx != null) {
+ TransactionId tid = getTransactionId();
+ if (tid != null) {
+ throw new java.lang.IllegalStateException(LocalizedStrings.TXManagerImpl_TRANSACTION_0_ALREADY_IN_PROGRESS.toLocalizedString(tid));
+ }
+ if (tx instanceof TXState) {
+ throw new java.lang.IllegalStateException("Found instance of TXState: " + tx);
+ }
+ setTXState(tx);
+ tx.resume();
+ SystemTimerTask task = this.expiryTasks.remove(tx.getTransactionId());
+ if (task != null) {
+ task.cancel();
+ }
+ }
+ }
+
+ private final boolean isClosed() {
+ return this.closed;
+ }
+ private final void checkClosed() {
+ cache.getCancelCriterion().checkCancelInProgress(null);
+ if (this.closed) {
+ throw new TXManagerCancelledException("This transaction manager is closed.");
+ }
+ }
+
+ final DM getDM() {
+ return this.dm;
+ }
+
+
+ public static int getCurrentTXUniqueId() {
+ if(currentInstance==null) {
+ return NOTX;
+ }
+ return currentInstance.getMyTXUniqueId();
+ }
+
+
+
+
+ public final static TXStateProxy getCurrentTXState() {
+ if(currentInstance==null) {
+ return null;
+ }
+ return currentInstance.getTXState();
+ }
+
+ public static void incrementTXUniqueIDForReplay() {
+ if(currentInstance != null) {
+ currentInstance._incrementTXUniqueIDForReplay();
+ }
+ }
+
+ public int getMyTXUniqueId() {
+ TXStateProxy t = txContext.get();
+ if (t != null) {
+ return t.getTxId().getUniqId();
+ } else {
+ return NOTX;
+ }
+ }
+
+ /**
+ * Associate the remote txState with the thread processing this message. Also,
+ * we acquire a lock on the txState, on which this thread operates.
+ * Some messages like SizeMessage should not create a new txState.
+ * @param msg
+ * @return {@link TXStateProxy} the txProxy for the transactional message
+ * @throws InterruptedException
+ */
+ public TXStateProxy masqueradeAs(TransactionMessage msg) throws InterruptedException {
+ if (msg.getTXUniqId() == NOTX || !msg.canParticipateInTransaction()) {
+ return null;
+ }
+ TXId key = new TXId(msg.getMemberToMasqueradeAs(), msg.getTXUniqId());
+ TXStateProxy val;
+ val = this.hostedTXStates.get(key);
+ if (val == null) {
+ synchronized(this.hostedTXStates) {
+ val = this.hostedTXStates.get(key);
+ if (val == null && msg.canStartRemoteTransaction()) {
+ if (msg.isTransactionDistributed()) {
+ val = new DistTXStateProxyImplOnDatanode(this, key, msg.getTXOriginatorClient());
+ val.setLocalTXState(new DistTXState(val,true));
+ } else {
+ val = new TXStateProxyImpl(this, key, msg.getTXOriginatorClient());
+ val.setLocalTXState(new TXState(val,true));
+ }
+ this.hostedTXStates.put(key, val);
+ }
+ }
+ }
+ if (val != null) {
+ if (!val.getLock().isHeldByCurrentThread()) {
+ val.getLock().lock();
+ }
+ }
+
+ setTXState(val);
+ return val;
+ }
+
+ /**
+ * Associate the remote txState with the thread processing this message. Also,
+ * we acquire a lock on the txState, on which this thread operates.
+ * Some messages like SizeMessage should not create a new txState.
+ * @param msg
+ * @param memberId
+ * @param probeOnly - do not masquerade; just look up the TX state
+ * @return {@link TXStateProxy} the txProxy for the transactional message
+ * @throws InterruptedException
+ */
+ public TXStateProxy masqueradeAs(Message msg,InternalDistributedMember memberId, boolean probeOnly) throws InterruptedException {
+ if (msg.getTransactionId() == NOTX) {
+ return null;
+ }
+ TXId key = new TXId(memberId, msg.getTransactionId());
+ TXStateProxy val;
+ val = this.hostedTXStates.get(key);
+ if (val == null) {
+ synchronized(this.hostedTXStates) {
+ val = this.hostedTXStates.get(key);
- if (val == null && msg.canStartRemoteTransaction()) {
++ if (val == null) {
+ // [sjigyasu] TODO: Conditionally create object based on distributed or non-distributed tx mode
+ if (msg instanceof TransactionMessage && ((TransactionMessage)msg).isTransactionDistributed()) {
+ val = new DistTXStateProxyImplOnDatanode(this, key, memberId);
+ //val.setLocalTXState(new DistTXState(val,true));
+ } else {
+ val = new TXStateProxyImpl(this, key, memberId);
+ //val.setLocalTXState(new TXState(val,true));
+ }
+ this.hostedTXStates.put(key, val);
+ }
+ }
+ }
+ if (!probeOnly) {
+ if (val != null) {
+ if (!val.getLock().isHeldByCurrentThread()) {
+ val.getLock().lock();
+ // add the TXStateProxy back to the map
+ // in-case another thread removed it while we were waiting to lock.
+ // This can happen during client transaction failover.
+ synchronized (this.hostedTXStates) {
+ this.hostedTXStates.put(key, val);
+ }
+ }
+ }
+ setTXState(val);
+ }
+ return val;
+ }
+
+
+
+
+ /**
+ * Associate the transactional state with this thread.
+ * @param txState the transactional state.
+ */
+ public void masqueradeAs(TXStateProxy txState) {
+ assert txState != null;
+ if (!txState.getLock().isHeldByCurrentThread()) {
+ txState.getLock().lock();
+ }
+ setTXState(txState);
+ }
+
+ /**
+ * Remove the association created by {@link #masqueradeAs(TransactionMessage)}
+ * @param tx
+ */
+ public void unmasquerade(TXStateProxy tx) {
+ if (tx != null) {
+ setTXState(null);
+ tx.getLock().unlock();
+ }
+ }
+
+ /**
+ * Cleanup the remote txState after commit and rollback
+ * @param txId
+ * @return the TXStateProxy
+ */
+ public TXStateProxy removeHostedTXState(TXId txId) {
+ synchronized (this.hostedTXStates) {
+ TXStateProxy result = this.hostedTXStates.remove(txId);
+ if (result != null) {
+ result.close();
+ }
+ return result;
+ }
+ }
+
+ /**
+ * Called when the CacheServer is shutdown.
+ * Removes txStates hosted on client's behalf
+ */
+ protected void removeHostedTXStatesForClients() {
+ synchronized (this.hostedTXStates) {
+ Iterator<Entry<TXId, TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry<TXId, TXStateProxy> entry = iterator.next();
+ if (entry.getValue().isOnBehalfOfClient()) {
+ entry.getValue().close();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Cleaning up TXStateProxy for {}", entry.getKey());
+ }
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ /**
+ * Used to verify if a transaction with a given id is hosted by this txManager.
+ * @param txId
+ * @return true if the transaction is in progress, false otherwise
+ */
+ public boolean isHostedTxInProgress(TXId txId) {
+ synchronized (this.hostedTXStates) {
+ TXStateProxy tx = this.hostedTXStates.get(txId);
+ if (tx == null) {
+ return false;
+ }
+ return tx.isRealDealLocal();
+ }
+ }
+
+ public TXStateProxy getHostedTXState(TXId txId) {
+ synchronized (this.hostedTXStates) {
+ return this.hostedTXStates.get(txId);
+ }
+ }
+
+ /**
+ * @return number of transaction in progress on behalf of remote nodes
+ */
+ public int hostedTransactionsInProgressForTest() {
+ synchronized (this.hostedTXStates) {
+ return this.hostedTXStates.size();
+ }
+ }
+ public int localTransactionsInProgressForTest() {
+ return this.localTxMap.size();
+ }
+
+ public void memberDeparted(InternalDistributedMember id, boolean crashed) {
+ synchronized (this.hostedTXStates) {
+ Iterator<Map.Entry<TXId,TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TXId,TXStateProxy> me = iterator.next();
+ TXId txId = me.getKey();
+ if (txId.getMemberId().equals(id)) {
+ me.getValue().close();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received memberDeparted, cleaning up txState:{}", txId);
+ }
+ iterator.remove();
+ }
+ }
+ }
+ }
+
+ public void memberJoined(InternalDistributedMember id) {
+ }
+
+ public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
+ }
+
+ public void memberSuspect(InternalDistributedMember id,
+ InternalDistributedMember whoSuspected, String reason) {
+ }
+
+
+ /**
+ * retrieve the transaction states for the given client
+ * @param id the client's membership ID
+ * @return a set of the currently open transaction states
+ */
+ public Set<TXId> getTransactionsForClient(InternalDistributedMember id) {
+ Set<TXId> result = new HashSet<TXId>();
+ synchronized (this.hostedTXStates) {
+ for (Map.Entry<TXId, TXStateProxy> entry: this.hostedTXStates.entrySet()) {
+ if (entry.getKey().getMemberId().equals(id)) {
+ result.add(entry.getKey());
+ }
+ }
+ }
+ return result;
+ }
+
+ /** remove the given TXStates */
+ public void removeTransactions(Set<TXId> txIds, boolean distribute) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("expiring the following transactions: {}", txIds);
+ }
+ synchronized (this.hostedTXStates) {
+ Iterator<Map.Entry<TXId, TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TXId,TXStateProxy> entry = iterator.next();
+ if (txIds.contains(entry.getKey())) {
+ entry.getValue().close();
+ iterator.remove();
+ }
+ }
+ }
+ if (distribute) {
+ // tell other VMs to also remove the transactions
+ TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(), txIds);
+ }
+ }
+
+ private void saveTXStateForClientFailover(TXStateProxy tx) {
+ if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
+ failoverMap.put(tx.getTxId(), tx.getCommitMessage());
+ if (logger.isDebugEnabled()) {
+ logger.debug("TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
+ tx.getTxId(), failoverMap.size());
+ }
+ }
+ }
+
+ private void saveTXStateForClientFailover(TXStateProxy tx, TXCommitMessage msg) {
+ if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
+ failoverMap.put(tx.getTxId(), msg);
+ if (logger.isDebugEnabled()) {
+ logger.debug("TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
+ tx.getTxId(), failoverMap.size());
+ }
+ }
+ }
+
+ public void saveTXCommitMessageForClientFailover(TXId txId, TXCommitMessage msg) {
+ failoverMap.put(txId, msg);
+ }
+
+ public boolean isHostedTxRecentlyCompleted(TXId txId) {
+ // if someone is asking to see if we have the txId, they will come
+ // back and ask for the commit message, this could take a long time
+ // specially when called from TXFailoverCommand, so we move
+ // the txId to the front of the queue
+ TXCommitMessage msg = failoverMap.remove(txId);
+ if (msg != null) {
+ failoverMap.put(txId, msg);
+ return true;
+ }
+ return false;
+ }
+
+
+ /**
+ * If the given transaction is already being completed by another thread
+ * this will wait for that completion to finish and will ensure that
+ * the result is saved in the client failover map.
+ * @param txId
+ * @return true if a wait was performed
+ */
+ public boolean waitForCompletingTransaction(TXId txId) {
+ TXStateProxy val;
+ val = this.hostedTXStates.get(txId);
+ if (val == null) {
+ synchronized(this.hostedTXStates) {
+ val = this.hostedTXStates.get(txId);
+ }
+ }
+ if (val != null && val.isRealDealLocal()) {
+ TXStateProxyImpl impl = (TXStateProxyImpl)val;
+ TXState state = impl.getLocalRealDeal();
+ if (state.waitForPreviousCompletion()) {
+ // the thread we were waiting for would have put a TXCommitMessage
+ // in the failover map, doing so here may replace an existing token
+ // like TXCommitMessage.REBALANCE_MSG with null. fixes bug 42661
+ //saveTXStateForClientFailover(impl);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns the TXCommitMessage for a transaction that has been
+ * successfully completed.
+ * @param txId
+ * @return the commit message or an exception token e.g
+ * {@link TXCommitMessage#CMT_CONFLICT_MSG} if the transaction
+ * threw an exception
+ * @see #isExceptionToken(TXCommitMessage)
+ */
+ public TXCommitMessage getRecentlyCompletedMessage(TXId txId) {
+ return failoverMap.get(txId);
+ }
+
+ /**
+ * @param msg
+ * @return true if msg is an exception token, false otherwise
+ */
+ public boolean isExceptionToken(TXCommitMessage msg) {
+ if (msg == TXCommitMessage.CMT_CONFLICT_MSG
+ || msg == TXCommitMessage.REBALANCE_MSG
+ || msg == TXCommitMessage.EXCEPTION_MSG) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Generates exception messages for the three TXCommitMessage tokens that represent
+ * exceptions during transaction execution.
+ * @param msg the token that represents the exception
+ * @param txId
+ * @return the exception
+ */
+ public RuntimeException getExceptionForToken(TXCommitMessage msg, TXId txId) {
+ if (msg == TXCommitMessage.CMT_CONFLICT_MSG) {
+ return new CommitConflictException(LocalizedStrings.
+ TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0.toLocalizedString(txId));
+ }
+ if (msg == TXCommitMessage.REBALANCE_MSG) {
+ return new TransactionDataRebalancedException(LocalizedStrings.
+ PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
+ }
+ if (msg == TXCommitMessage.EXCEPTION_MSG) {
+ return new TransactionInDoubtException(LocalizedStrings.
+ ClientTXStateStub_COMMIT_FAILED_ON_SERVER.toLocalizedString());
+ }
+ throw new InternalGemFireError("the parameter TXCommitMessage is not an exception token");
+ }
+
+ public static class TXRemovalMessage extends HighPriorityDistributionMessage {
+
+ Set<TXId> txIds;
+
+ /** for deserialization */
+ public TXRemovalMessage() {
+ }
+
+ static void send(DM dm, Set recipients, Set<TXId> txIds) {
+ TXRemovalMessage msg = new TXRemovalMessage();
+ msg.txIds = txIds;
+ msg.setRecipients(recipients);
+ dm.putOutgoing(msg);
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+ DataSerializer.writeHashSet((HashSet<TXId>)this.txIds, out);
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+ this.txIds = DataSerializer.readHashSet(in);
+ }
+
+ public int getDSFID() {
+ return TX_MANAGER_REMOVE_TRANSACTIONS;
+ }
+
+ @Override
+ protected void process(DistributionManager dm) {
+ GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+ if (cache != null) {
+ TXManagerImpl mgr = cache.getTXMgr();
+ mgr.removeTransactions(this.txIds, false);
+ }
+ }
+
+ }
+
+ private ConcurrentMap<TransactionId, TXStateProxy> suspendedTXs = new ConcurrentHashMap<TransactionId, TXStateProxy>();
+
+ public TransactionId suspend() {
+ return suspend(TimeUnit.MINUTES);
+ }
+
+ TransactionId suspend(TimeUnit expiryTimeUnit) {
+ TXStateProxy result = getTXState();
+ if (result != null) {
+ TransactionId txId = result.getTransactionId();
+ internalSuspend();
+ this.suspendedTXs.put(txId, result);
+ // wake up waiting threads
+ Queue<Thread> waitingThreads = this.waitMap.get(txId);
+ if (waitingThreads != null) {
+ Thread waitingThread = null;
+ while (true) {
+ waitingThread = waitingThreads.poll();
+ if (waitingThread == null
+ || !Thread.currentThread().equals(waitingThread)) {
+ break;
+ }
+ }
+ if (waitingThread != null) {
+ LockSupport.unpark(waitingThread);
+ }
+ }
+ scheduleExpiry(txId, expiryTimeUnit);
+ return txId;
+ }
+ return null;
+ }
+
+ public void resume(TransactionId transactionId) {
+ if (transactionId == null) {
+ throw new IllegalStateException(
+ LocalizedStrings.TXManagerImpl_UNKNOWN_TRANSACTION_OR_RESUMED
+ .toLocalizedString());
+ }
+ if (getTXState() != null) {
+ throw new IllegalStateException(
+ LocalizedStrings.TXManagerImpl_TRANSACTION_ACTIVE_CANNOT_RESUME
+ .toLocalizedString());
+ }
+ TXStateProxy txProxy = this.suspendedTXs.remove(transactionId);
+ if (txProxy == null) {
+ throw new IllegalStateException(
+ LocalizedStrings.TXManagerImpl_UNKNOWN_TRANSACTION_OR_RESUMED
+ .toLocalizedString());
+ }
+ resume(txProxy);
+ }
+
+ public boolean isSuspended(TransactionId transactionId) {
+ return this.suspendedTXs.containsKey(transactionId);
+ }
+
+ public boolean tryResume(TransactionId transactionId) {
+ if (transactionId == null || getTXState() != null) {
+ return false;
+ }
+ TXStateProxy txProxy = this.suspendedTXs.remove(transactionId);
+ if (txProxy != null) {
+ resume(txProxy);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * this map keeps track of all the threads that are waiting in
+ * {@link #tryResume(TransactionId, long, TimeUnit)} for a particular
+ * transactionId
+ */
+ private ConcurrentMap<TransactionId, Queue<Thread>> waitMap = new ConcurrentHashMap<TransactionId, Queue<Thread>>();
+
+ public boolean tryResume(TransactionId transactionId, long time, TimeUnit unit) {
+ if (transactionId == null || getTXState() != null || !exists(transactionId)) {
+ return false;
+ }
+ Thread currentThread = Thread.currentThread();
+ long timeout = unit.toNanos(time);
+ long startTime = System.nanoTime();
+ Queue<Thread> threadq = null;
+
+ try {
+ while (true) {
+ threadq = waitMap.get(transactionId);
+ if (threadq == null) {
+ threadq = new ConcurrentLinkedQueue<Thread>();
+ Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq);
+ if (oldq != null) {
+ threadq = oldq;
+ }
+ }
+ threadq.add(currentThread);
+ // after putting this thread in waitMap, we should check for
+ // an entry in suspendedTXs. if no entry is found in suspendedTXs
+ // next invocation of suspend() will unblock this thread
+ if (tryResume(transactionId)) {
+ return true;
+ } else if (!exists(transactionId)) {
+ return false;
+ }
+ LockSupport.parkNanos(timeout);
+ long nowTime = System.nanoTime();
+ timeout -= nowTime - startTime;
+ startTime = nowTime;
+ if (timeout <= 0) {
+ break;
+ }
+ }
+ } finally {
+ threadq = waitMap.get(transactionId);
+ if (threadq != null) {
+ threadq.remove(currentThread);
+ // the queue itself will be removed at commit/rollback
+ }
+ }
+ return false;
+ }
+
+ public boolean exists(TransactionId transactionId) {
+ return isHostedTxInProgress((TXId) transactionId)
+ || isSuspended(transactionId)
+ || this.localTxMap.containsKey(transactionId);
+ }
+
+ /**
+ * The timeout after which any suspended transactions are
+ * rolled back if they are not resumed. If a negative
+ * timeout is passed, suspended transactions will never expire.
+ * @param timeout the timeout in minutes
+ */
+ public void setSuspendedTransactionTimeout(long timeout) {
+ this.suspendedTXTimeout = timeout;
+ }
+
+ /**
+ * Return the timeout after which suspended transactions
+ * are rolled back.
+ * @return the timeout in minutes
+ * @see #setSuspendedTransactionTimeout(long)
+ */
+ public long getSuspendedTransactionTimeout() {
+ return this.suspendedTXTimeout;
+ }
+
+ /**
+ * map to track the scheduled expiry tasks of suspended transactions.
+ */
+ private ConcurrentMap<TransactionId, SystemTimerTask> expiryTasks = new ConcurrentHashMap<TransactionId, SystemTimerTask>();
+
+ /**
+ * schedules the transaction to expire after {@link #suspendedTXTimeout}
+ * @param txId
+ * @param expiryTimeUnit the time unit to use when scheduling the expiration
+ */
+ private void scheduleExpiry(TransactionId txId, TimeUnit expiryTimeUnit) {
+ final GemFireCacheImpl cache = (GemFireCacheImpl) this.cache;
+ if (suspendedTXTimeout < 0) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("TX: transaction: {} not scheduled to expire", txId);
+ }
+ return;
+ }
+ SystemTimerTask task = new TXExpiryTask(txId);
+ if (logger.isDebugEnabled()) {
+ logger.debug("TX: scheduling transaction: {} to expire after:{}", txId, suspendedTXTimeout);
+ }
+ cache.getCCPTimer().schedule(task, TimeUnit.MILLISECONDS.convert(suspendedTXTimeout, expiryTimeUnit));
+ this.expiryTasks.put(txId, task);
+ }
+
+ /**
+ * Task scheduled to expire a transaction when it is suspended.
+ * This task gets canceled if the transaction is resumed.
+ * @author sbawaska
+ */
+ public static class TXExpiryTask extends SystemTimerTask {
+
+ /**
+ * The txId to expire
+ */
+ private final TransactionId txId;
+
+ public TXExpiryTask(TransactionId txId) {
+ this.txId = txId;
+ }
+ @Override
+ public void run2() {
+ TXManagerImpl mgr = TXManagerImpl.currentInstance;
+ TXStateProxy tx = mgr.suspendedTXs.remove(txId);
+ if (tx != null) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("TX: Expiry task rolling back transaction: {}", txId);
+ }
+ tx.rollback();
+ } catch (GemFireException e) {
+ logger.warn(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_IN_TRANSACTION_TIMEOUT, txId), e);
+ }
+ }
+ }
+ }
+ private static class RefCountMapEntryCreator implements CustomEntryConcurrentHashMap.HashEntryCreator<AbstractRegionEntry, RefCountMapEntry> {
+ @Override
+ public HashEntry<AbstractRegionEntry, RefCountMapEntry> newEntry(AbstractRegionEntry key, int hash,
+ HashEntry<AbstractRegionEntry, RefCountMapEntry> next, RefCountMapEntry value) {
+ value.setNextEntry(next);
+ return value;
+ }
+
+ @Override
+ public int keyHashCode(Object key, boolean compareValues) {
+ // key will always be an AbstractRegionEntry because our map is strongly typed.
+ return ((AbstractRegionEntry) key).getEntryHash();
+ }
+ }
+ private static class RefCountMapEntry implements HashEntry<AbstractRegionEntry, RefCountMapEntry> {
+ private final AbstractRegionEntry key;
+ private HashEntry<AbstractRegionEntry, RefCountMapEntry> next;
+ private volatile int refCount;
+ private static final AtomicIntegerFieldUpdater<RefCountMapEntry> refCountUpdater
+ = AtomicIntegerFieldUpdater.newUpdater(RefCountMapEntry.class, "refCount");
+ public RefCountMapEntry(AbstractRegionEntry k) {
+ this.key = k;
+ this.refCount = 1;
+ }
+
+ @Override
+ public AbstractRegionEntry getKey() {
+ return this.key;
+ }
+
+ @Override
+ public boolean isKeyEqual(Object k) {
+ return this.key.equals(k);
+ }
+
+ @Override
+ public RefCountMapEntry getMapValue() {
+ return this;
+ }
+
+ @Override
+ public void setMapValue(RefCountMapEntry newValue) {
+ if (newValue != this) {
+ throw new IllegalStateException("Expected newValue " + newValue + " to be this " + this);
+ }
+ }
+
+ @Override
+ public int getEntryHash() {
+ return this.key.getEntryHash();
+ }
+
+ @Override
+ public HashEntry<AbstractRegionEntry, RefCountMapEntry> getNextEntry() {
+ return this.next;
+ }
+
+ @Override
+ public void setNextEntry(HashEntry<AbstractRegionEntry, RefCountMapEntry> n) {
+ this.next = n;
+ }
+
+ public void incRefCount() {
+ refCountUpdater.addAndGet(this, 1);
+ }
+
+ /**
+ * Returns true if refCount goes to 0.
+ */
+ public boolean decRefCount() {
+ int rc = refCountUpdater.decrementAndGet(this);
+ if (rc < 0) {
+ throw new IllegalStateException("rc=" + rc);
+ }
+ return rc == 0;
+ }
+ }
+
+ private final CustomEntryConcurrentHashMap<AbstractRegionEntry, RefCountMapEntry> refCountMap
+ = new CustomEntryConcurrentHashMap<AbstractRegionEntry, RefCountMapEntry>(
+ CustomEntryConcurrentHashMap.DEFAULT_INITIAL_CAPACITY,
+ CustomEntryConcurrentHashMap.DEFAULT_LOAD_FACTOR,
+ CustomEntryConcurrentHashMap.DEFAULT_CONCURRENCY_LEVEL,
+ true,
+ new RefCountMapEntryCreator());
+
+ private static final MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object> incCallback = new MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object>() {
+ @Override
+ public RefCountMapEntry newValue(AbstractRegionEntry key, Object context,
+ Object createParams) {
+ return new RefCountMapEntry(key);
+ }
+ @Override
+ public void oldValueRead(RefCountMapEntry value) {
+ value.incRefCount();
+ }
+ @Override
+ public boolean doRemoveValue(RefCountMapEntry value, Object context,
+ Object removeParams) {
+ throw new IllegalStateException("doRemoveValue should not be called from create");
+ }
+ };
+
+ private static final MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object> decCallback = new MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object>() {
+ @Override
+ public RefCountMapEntry newValue(AbstractRegionEntry key, Object context,
+ Object createParams) {
+ throw new IllegalStateException("newValue should not be called from remove");
+ }
+ @Override
+ public void oldValueRead(RefCountMapEntry value) {
+ throw new IllegalStateException("oldValueRead should not be called from remove");
+ }
+ @Override
+ public boolean doRemoveValue(RefCountMapEntry value, Object context,
+ Object removeParams) {
+ return value.decRefCount();
+ }
+ };
+
+ public static final void incRefCount(AbstractRegionEntry re) {
+ TXManagerImpl mgr = currentInstance;
+ if (mgr != null) {
+ mgr.refCountMap.create(re, incCallback, null, null, true);
+ }
+ }
+ /**
+ * Return true if refCount went to zero.
+ */
+ public static final boolean decRefCount(AbstractRegionEntry re) {
+ TXManagerImpl mgr = currentInstance;
+ if (mgr != null) {
+ return mgr.refCountMap.removeConditionally(re, decCallback, null, null) != null;
+ } else {
+ return true;
+ }
+ }
+
+ // Used by tests
+ public Set<TXId> getLocalTxIds() {
+ return this.localTxMap.keySet();
+ }
+
+ // Used by tests
+ public ArrayList<TXId> getHostedTxIds() {
+ synchronized (this.hostedTXStates) {
+ return new ArrayList<TXId>(this.hostedTXStates.keySet());
+ }
+ }
+
+ public void setDistributed(boolean flag) {
+ checkClosed();
+ TXStateProxy tx = getTXState();
+ // Check whether given flag and current flag are different and whether a transaction is in progress
+ if (tx != null && flag != isDistributed()) {
+ // Cannot change mode in the middle of a transaction
+ throw new java.lang.IllegalStateException(
+ LocalizedStrings.TXManagerImpl_CANNOT_CHANGE_TRANSACTION_MODE_WHILE_TRANSACTIONS_ARE_IN_PROGRESS
+ .toLocalizedString());
+ } else {
+ isTXDistributed.set(new Boolean(flag));
+ }
+ }
+
+ /*
+ * If explicitly set using setDistributed, this returns that value.
+ * If not, it returns the value of gemfire property "distributed-transactions" if set.
+ * If this is also not set, it returns the default value of this property.
+ */
+ public boolean isDistributed() {
+
+ Boolean value = isTXDistributed.get();
+ // This can be null if not set in setDistributed().
+ if (value == null) {
+ return InternalDistributedSystem.getAnyInstance().getOriginalConfig().getDistributedTransactions();
+ } else {
+ return value.booleanValue();
+ }
+ }
+
+}