You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/23 21:23:27 UTC

[31/94] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
index 83b00b4,0000000..3d6ce73
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/EventTracker.java
@@@ -1,811 -1,0 +1,812 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +/**
 + * 
 + */
 +package com.gemstone.gemfire.internal.cache;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentMap;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.DataSerializable;
 +import com.gemstone.gemfire.DataSerializer;
 +import com.gemstone.gemfire.cache.client.PoolFactory;
 +import com.gemstone.gemfire.distributed.DistributedMember;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.internal.Assert;
 +import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 +import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
 +import com.gemstone.gemfire.internal.cache.versions.RegionVersionVector;
 +import com.gemstone.gemfire.internal.cache.versions.VersionTag;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 +import com.gemstone.gemfire.internal.util.concurrent.StoppableCountDownLatch;
 +
 +/**
 + * EventTracker tracks the last sequence number for a particular
 + * memberID:threadID.  It is used to avoid replaying events in
 + * client/server and partitioned-region configurations.
 + * 
 + * @author bruce
 + * @since 6.0
 + */
 +public class EventTracker
 +{
 +  private static final Logger logger = LogService.getLogger();
 +  
 +  /**
 +   * a mapping of originator to the last event applied to this cache 
 +   *
 +   * Keys are instances of {@link ThreadIdentifier}, values are instances
 +   * of {@link com.gemstone.gemfire.internal.cache.EventTracker.EventSeqnoHolder}.
 +   */
 +  protected final ConcurrentMap<ThreadIdentifier, EventSeqnoHolder> recordedEvents 
 +      = new ConcurrentHashMap<ThreadIdentifier, EventSeqnoHolder>(100);
 + 
 +  /** a mapping of originator to bulkOp's last status (true means
 +   * finished processing) applied to this cache. 
 +   *
 +   * Keys are instances of @link {@link ThreadIdentifier}, values are instances
 +   * of {@link BulkOpProcessed}.
 +   */
 +  private final ConcurrentMap<ThreadIdentifier, BulkOpProcessed> recordedBulkOps 
 +      = new ConcurrentHashMap<ThreadIdentifier, BulkOpProcessed>(100);
 +  
 +  /** a mapping of originator to bulkOperation's last version tags. This map
 +   * differs from {@link #recordedBulkOps} in that the thread identifier used
 +   * here is the base member id and thread id of the bulk op, as opposed to the fake
 +   * thread id which is assigned for each bucket.
 +   * 
 +   * recordedBulkOps are also only tracked on the secondary for partitioned regions
 +   * recordedBulkOpVersionTags are tracked on both the primary and secondary.
 +   *
 +   * Keys are instances of @link {@link ThreadIdentifier}, values are instances
 +   * of {@link BulkOpHolder}.
 +   */
 +  private final ConcurrentMap<ThreadIdentifier, BulkOpHolder> recordedBulkOpVersionTags 
 +      = new ConcurrentHashMap<ThreadIdentifier, BulkOpHolder>(100);
 +  
 +  /**
 +   * The member that the region corresponding to this tracker (if any)
 +   * received its initial image from (if a replicate)
 +   */
 +  private volatile InternalDistributedMember initialImageProvider;
 +
 + 
 +  /**
 +   * The cache associated with this tracker
 +   */
 +  GemFireCacheImpl cache;
 + 
 +  /**
 +   * The name of this tracker
 +   */
 +  String name;
 +  
 +  /**
 +   * whether or not this tracker has been initialized with state from
 +   * another process
 +   */
 +  volatile boolean initialized;
 + 
 +  /**
 +   * object used to wait for initialization 
 +   */
 +  final StoppableCountDownLatch initializationLatch;
 +  
 +  /**
 +  * Initialize the EventTracker's timer task.  This is stored in the cache
 +  * for tracking and shutdown purposes
 +  * @param cache the cache to schedule tasks with
 +  */
 +  public static ExpiryTask startTrackerServices(GemFireCacheImpl cache) {
 +    long expiryTime = Long.getLong("gemfire.messageTrackingTimeout",
 +        PoolFactory.DEFAULT_SUBSCRIPTION_MESSAGE_TRACKING_TIMEOUT / 3).longValue();
 +    ExpiryTask result = new ExpiryTask(cache, expiryTime);
 +    cache.getCCPTimer().scheduleAtFixedRate(result,
 +        expiryTime, expiryTime);
 +    //schedule(result, expiryTime);
 +    return result;
 +  }
 +  
 +  /**
 +   * Terminate the tracker's timer task
 +   * @param cache the cache holding the tracker task
 +   */
 +  public static void stopTrackerServices(GemFireCacheImpl cache) {
 +    cache.getEventTrackerTask().cancel();
 +  }
 + 
 + /**
 +  * Create an event tracker
 +  * @param region the cache region to associate with this tracker
 +  */
 +  public EventTracker(LocalRegion region) {
 +   this.cache = region.cache;
 +   this.name = "Event Tracker for " + region.getName();
 +   this.initializationLatch = new StoppableCountDownLatch(region.stopper, 1);
 + }
 +
 +  /** start this event tracker */
 +  public void start() {
 +    if (this.cache.getEventTrackerTask() != null) {
 +      this.cache.getEventTrackerTask().addTracker(this);
 +    }
 +  }
 +  
 +  /** stop this event tracker */
 +  public void stop() {
 +    if (this.cache.getEventTrackerTask() != null) {
 +      this.cache.getEventTrackerTask().removeTracker(this);
 +    }
 +  }
 +  
 +  /**
 +   * retrieve a deep copy of the state of the event tracker.  Synchronization
 +   * is not used while copying the tracker's state.
 +   */
 +  public Map<ThreadIdentifier, EventSeqnoHolder> getState() {
 +    Map<ThreadIdentifier, EventSeqnoHolder> result = new HashMap<ThreadIdentifier, EventSeqnoHolder>(recordedEvents.size());
 +    for (Iterator<Map.Entry<ThreadIdentifier, EventSeqnoHolder>> it=recordedEvents.entrySet().iterator(); it.hasNext(); ) {
 +      Map.Entry<ThreadIdentifier, EventSeqnoHolder> entry = it.next();
 +      EventSeqnoHolder holder = entry.getValue();
 +      result.put(entry.getKey(), new EventSeqnoHolder(
 +          holder.lastSeqno, null)); // don't transfer version tags - adds too much bulk just so we can do client tag recovery
 +    }
 +    return result;
 +  }
 +  
 +  /**
 +   * record the given state in the tracker.
 +   * @param provider the member that provided this state
 +   * @param state a Map obtained from getState();
 +   */
 +  public void recordState(InternalDistributedMember provider, Map<ThreadIdentifier, EventSeqnoHolder> state) {
 +    this.initialImageProvider = provider;
 +    StringBuffer sb = null;
 +    if (logger.isDebugEnabled()) {
 +      sb = new StringBuffer(200);
 +      sb.append("Recording initial state for ")
 +        .append(this.name)
 +        .append(": ");
 +    }
 +    for (Iterator<Map.Entry<ThreadIdentifier, EventSeqnoHolder>> it=state.entrySet().iterator(); it.hasNext(); ) {
 +      Map.Entry<ThreadIdentifier, EventSeqnoHolder> entry = it.next();
 +      if (sb != null) {
 +        sb.append("\n  ")
 +          .append(entry.getKey().expensiveToString())
 +          .append("; sequenceID=")
 +          .append(entry.getValue()); 
 +      }
 +      // record only if we haven't received an event that is newer
 +      recordSeqno(entry.getKey(), entry.getValue(), true);
 +    }
 +    if (sb != null) {
 +      logger.debug(sb);
 +    }
 +    // fix for bug 41622 - hang in GII.  This keeps ops from waiting for the
 +    // full GII to complete
 +    setInitialized();
 +  }
 +  
 +  /**
 +   * Use this method to ensure that the tracker is put in an initialized state
 +   */
 +  public void setInitialized() {
 +    this.initializationLatch.countDown();
 +    this.initialized = true;
 +  }
 +  
 +  /**
 +   * Wait for the tracker to finishe being initialized
 +   */
 +  public void waitOnInitialization() throws InterruptedException {
 +    this.initializationLatch.await();
 +  }
 +  
 +  /**
 +   * Record an event sequence id if it is higher than what we currently have.
 +   * This is intended for use during initial image transfer.
 +   * @param membershipID the key of an entry in the map obtained from getEventState()
 +   * @param evhObj the value of an entry in the map obtained from getEventState()
 +   */
 +  protected void recordSeqno(ThreadIdentifier membershipID, EventSeqnoHolder evhObj){
 +    recordSeqno(membershipID, evhObj, false);
 +  }
 +  
 +  /**
 +   * Record an event sequence id if it is higher than what we currently have.
 +   * This is intended for use during initial image transfer.
 +   * @param threadID the key of an entry in the map obtained from getEventState()
 +   * @param evh the value of an entry in the map obtained from getEventState()
 +   * @param ifAbsent only record this state if there's not already an entry for this memberID
 +   */
 +  private void recordSeqno(ThreadIdentifier threadID, EventSeqnoHolder evh, boolean ifAbsent) {
 +    boolean removed;
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("recording {} {}", threadID.expensiveToString(), evh.toString());
 +    }
 +    do {
 +      removed = false;
 +      EventSeqnoHolder oldEvh = recordedEvents.putIfAbsent(
 +          threadID, evh);
 +      if (oldEvh != null) {
 +        synchronized(oldEvh) {
 +          if (oldEvh.removed) {
 +            // need to wait for an entry being removed by the sweeper to go away
 +            removed = true;
 +            continue;
 +          }
 +          else {
 +            if (ifAbsent) {
 +              break;
 +            }
 +            oldEvh.endOfLifeTimer = 0;
 +            if (oldEvh.lastSeqno < evh.lastSeqno) {
 +              oldEvh.lastSeqno = evh.lastSeqno;
 +              oldEvh.versionTag = evh.versionTag;
 +//              Exception e = oldEvh.context;
 +//              oldEvh.context = new Exception("stack trace");
 +//              oldEvh.context.initCause(e);
 +            }
 +          }
 +        }
 +      }
 +      else {
 +        evh.endOfLifeTimer = 0;
 +//        evh.context = new Exception("stack trace");
 +      }
 +    } while (removed);
 +  }
 +  
 +  /** record the event's threadid/sequenceid to prevent replay */
 +  public void recordEvent(InternalCacheEvent event) {
 +    EventID eventID = event.getEventId();
 +    if (ignoreEvent(event, eventID)) {
 +      return; // not tracked
 +    }
 +
 +    LocalRegion lr = (LocalRegion)event.getRegion();
 +
 +    ThreadIdentifier membershipID = new ThreadIdentifier(eventID.getMembershipID(),
 +        eventID.getThreadID());
 +
 +    VersionTag tag = null;
 +    if (lr.getServerProxy() == null/* && event.hasClientOrigin()*/) { // clients do not need to store version tags for replayed events
 +      tag = event.getVersionTag();
 +      RegionVersionVector v = ((LocalRegion)event.getRegion()).getVersionVector();
 +      // bug #46453 - make sure ID references are canonical before storing
 +      if (v != null && tag != null) {
 +        tag.setMemberID(v.getCanonicalId(tag.getMemberID()));
 +        if (tag.getPreviousMemberID() != null) {
 +          tag.setPreviousMemberID(v.getCanonicalId(tag.getPreviousMemberID()));
 +        }
 +      }
 +    }
-     EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag);
-     if (logger.isTraceEnabled()){
-       logger.trace("region event tracker recording {}", event);
-     }
-     recordSeqno(membershipID, newEvh);
 +    
 +    //If this is a bulkOp, and concurrency checks are enabled, we need to
 +    //save the version tag in case we retry.
 +    if (lr.concurrencyChecksEnabled 
 +        && (event.getOperation().isPutAll() || event.getOperation().isRemoveAll()) && lr.getServerProxy() == null) {
 +      recordBulkOpEvent(event, membershipID);
 +    }
++
++    EventSeqnoHolder newEvh = new EventSeqnoHolder(eventID.getSequenceID(), tag);
++    if (logger.isTraceEnabled()){
++      logger.trace("region event tracker recording {}", event);
++    }
++    recordSeqno(membershipID, newEvh);
 +  }
 +
 +  /**
 +   * Record a version tag for a bulk operation
 +   */
 +  private void recordBulkOpEvent(InternalCacheEvent event, ThreadIdentifier tid) {
 +    EventID eventID = event.getEventId();
 +    
 +    VersionTag tag = event.getVersionTag();
 +    if (tag == null) {
 +      return;
 +    }
 +    
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("recording bulkOp event {} {} {} op={}", tid.expensiveToString(),
 +          eventID, tag, event.getOperation());
 +    }
 +
 +    RegionVersionVector v = ((LocalRegion)event.getRegion()).getVersionVector();
 +    // bug #46453 - make sure ID references are canonical before storing
 +    if (v != null) {
 +      tag.setMemberID(v.getCanonicalId(tag.getMemberID()));
 +      if (tag.getPreviousMemberID() != null) {
 +        tag.setPreviousMemberID(v.getCanonicalId(tag.getPreviousMemberID()));
 +      }
 +    }
 +
 +    //Loop until we can successfully update the recorded bulk operations
 +    //For this thread id.
 +    boolean retry = false;
 +    do {
 +      BulkOpHolder bulkOpTracker = recordedBulkOpVersionTags.get(tid);
 +      if(bulkOpTracker == null) {
 +        bulkOpTracker = new BulkOpHolder();
 +        BulkOpHolder old = recordedBulkOpVersionTags.putIfAbsent(tid, bulkOpTracker);
 +        if(old != null) {
 +          retry = true;
 +          continue;
 +        }
 +      }
 +      synchronized(bulkOpTracker) {
 +        if(bulkOpTracker.removed) {
 +          retry = true;
 +          continue;
 +        }
 +        
 +        //Add the version tag for bulkOp event.
 +        bulkOpTracker.putVersionTag(eventID, event.getVersionTag());
 +        retry = false;
 +      }
 +    } while(retry);
 +  }
 +  
 +  public boolean hasSeenEvent(InternalCacheEvent event) {
 +//  ClientProxyMembershipID membershipID = event.getContext();
 +    EventID eventID = event.getEventId();
 +    if (ignoreEvent(event, eventID)) {
 +      return false; // not tracked
 +    }
 +    return hasSeenEvent(eventID, event);
 +  }
 +  
 +  public boolean hasSeenEvent(EventID eventID) {
 +    return hasSeenEvent(eventID, null);
 +  }
 +
 +  public boolean hasSeenEvent(EventID eventID, InternalCacheEvent tagHolder) {
 +    ThreadIdentifier membershipID = new ThreadIdentifier(
 +        eventID.getMembershipID(), eventID.getThreadID());
 +//  if (membershipID == null || eventID == null) {
 +//    return false;
 +//  }
 +        
 +    EventSeqnoHolder evh = recordedEvents.get(membershipID);
 +    if (evh == null) {
 +      return false;
 +    }
 +    
 +    synchronized (evh) {
 +      if (evh.removed || evh.lastSeqno < eventID.getSequenceID()) {
 +        return false;
 +      }
 +      // log at fine because partitioned regions can send event multiple times
 +      // during normal operation during bucket region initialization
 +      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER)) {
 +        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, "Cache encountered replay of event with ID {}.  Highest recorded for this source is {}",
 +            eventID, evh.lastSeqno);
 +      }
 +      // bug #44956 - recover version tag for duplicate event
 +      if (evh.lastSeqno == eventID.getSequenceID() && tagHolder != null && evh.versionTag != null) {
 +        ((EntryEventImpl)tagHolder).setVersionTag(evh.versionTag);
 +      }
 +      return true;
 +    } // synchronized
 +  }
 +
 +  public VersionTag findVersionTag(EventID eventID) {
 +    ThreadIdentifier threadID = new ThreadIdentifier(
 +        eventID.getMembershipID(), eventID.getThreadID());
 +        
 +    EventSeqnoHolder evh = recordedEvents.get(threadID);
 +    if (evh == null) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("search for version tag failed as no event is recorded for {}", threadID.expensiveToString());
 +      }
 +      return null;
 +    }
 +    
 +    synchronized (evh) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("search for version tag located last event for {}: {}",threadID.expensiveToString(), evh);
 +      }
 +      if (evh.lastSeqno != eventID.getSequenceID()) {
 +        return null;
 +      }
 +      // log at fine because partitioned regions can send event multiple times
 +      // during normal operation during bucket region initialization
 +      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && evh.versionTag == null) {
 +        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, "Could not recover version tag.  Found event holder with no version tag for {}", eventID);
 +      }
 +      return evh.versionTag;
 +    } // synchronized
 +  }
 +  
 +  public VersionTag findVersionTagForGateway(EventID eventID) {
 +    ThreadIdentifier threadID = new ThreadIdentifier(
 +        eventID.getMembershipID(), eventID.getThreadID());
 +        
 +    EventSeqnoHolder evh = recordedEvents.get(threadID);
 +    if (evh == null) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("search for version tag failed as no event is recorded for {}", threadID.expensiveToString());
 +      }
 +      return null;
 +    }
 +    
 +    synchronized (evh) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("search for version tag located last event for {}: {} {}",threadID.expensiveToString(), evh, eventID.getSequenceID() );
 +      }
 +      
 +      if (evh.lastSeqno < eventID.getSequenceID()) {
 +        return null;
 +      }
 +      // log at fine because partitioned regions can send event multiple times
 +      // during normal operation during bucket region initialization
 +      if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_BRIDGE_SERVER) && evh.versionTag == null) {
 +        logger.trace(LogMarker.DISTRIBUTION_BRIDGE_SERVER, "Could not recover version tag.  Found event holder with no version tag for {}", eventID);
 +      }
 +      return evh.versionTag;
 +    } // synchronized
 +  }
 +  
 +  
 +  public VersionTag findVersionTagForBulkOp(EventID eventID) {
 +    ThreadIdentifier threadID = new ThreadIdentifier(
 +        eventID.getMembershipID(), eventID.getThreadID());
 +        
 +    BulkOpHolder evh = recordedBulkOpVersionTags.get(threadID);
 +    if (evh == null) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("search for version tag failed as no events are recorded for {}", threadID.expensiveToString());
 +      }
 +      return null;
 +    }
 +    
 +    synchronized (evh) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("search for version tag located event holder for {}: {}", threadID.expensiveToString(), evh);
 +      }
 +      return evh.entryVersionTags.get(eventID);
 +    } // synchronized
 +  }
 +
 +  /**
 +   * @param event
 +   * @param eventID
 +   * @return true if the event should not be tracked, false otherwise
 +   */
 +  private boolean ignoreEvent(InternalCacheEvent event, EventID eventID) {
 +    if (eventID == null) {
 +      return true;
 +    } else {
 +      boolean isVersioned = (event.getVersionTag() != null);
 +      boolean isClient = event.hasClientOrigin();
 +      if (isVersioned && isClient) {
 +        return false; // version tags for client events are kept for retries by the client
 +      }
 +      boolean isEntry = event.getOperation().isEntry();
 +      boolean isPr = event.getRegion().getAttributes().getDataPolicy().withPartitioning()
 +                   || ((LocalRegion)event.getRegion()).isUsedForPartitionedRegionBucket();
 +      return (!isClient &&   // ignore if it originated on a server, and
 +           isEntry &&        // it affects an entry and
 +          !isPr);            // is not on a PR
 +    }
 +  }
 +
 +  /**
 +   * A routine to provide synchronization running based on <memberShipID, threadID> 
 +   * of the requesting client
 +   * @param r - a Runnable to wrap the processing of the bulk op
 +   * @param eventID - the base event ID of the bulk op
 +   *
 +   * @since 5.7
 +   */
 +  public void syncBulkOp(Runnable r, EventID eventID) {
 +    Assert.assertTrue(eventID != null);
 +    ThreadIdentifier membershipID = new ThreadIdentifier(
 +      eventID.getMembershipID(), eventID.getThreadID());
 +
 +    BulkOpProcessed opSyncObj = recordedBulkOps.putIfAbsent(membershipID, new BulkOpProcessed(false));
 +    if (opSyncObj == null) {
 +      opSyncObj = recordedBulkOps.get(membershipID);
 +    }
 +    synchronized (opSyncObj) {
 +      try {
 +        if (opSyncObj.getStatus() && logger.isDebugEnabled()) {
 +          logger.debug("SyncBulkOp: The operation was performed by another thread.");
 +        }
 +        else {
 +          recordBulkOpStart(membershipID);
 +          
 +          //Perform the bulk op
 +          r.run();
 +          // set to true in case another thread is waiting at sync 
 +          opSyncObj.setStatus(true);
 +          recordedBulkOps.remove(membershipID);
 +        }
 +      }
 +      finally {
 +        recordedBulkOps.remove(membershipID);
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Called when a bulkOp is started on the local region. Used to clear
 +   * event tracker state from the last bulkOp.
 +   */
 +  public void recordBulkOpStart(ThreadIdentifier tid) {
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("recording bulkOp start for {}", tid.expensiveToString());
 +    }
 +    this.recordedBulkOpVersionTags.remove(tid);
 +  }
 +  
 +  /**
 +   * @return the initialized
 +   */
 +  public boolean isInitialized() {
 +    return this.initialized;
 +  }
 +  
 +  /**
 +   * @param mbr the member in question
 +   * @return true if the given member provided the initial image event state for this tracker
 +   */
 +  public boolean isInitialImageProvider(DistributedMember mbr) {
 +    return (this.initialImageProvider != null)
 +      && (mbr != null)
 +      && this.initialImageProvider.equals(mbr);
 +  }
 +  
 +  /**
 +   * Test method for getting the set of recorded version tags.
 +   */
 +  protected ConcurrentMap<ThreadIdentifier, BulkOpHolder> getRecordedBulkOpVersionTags() {
 +    return recordedBulkOpVersionTags;
 +  }
 +  
 +  @Override
 +  public String toString() {
 +    return ""+this.name+"(initialized=" + this.initialized+")";
 +  }
 +  
 +  /**
 +   * A sequence number tracker to keep events from clients from being
 +   * re-applied to the cache if they've already been seen.
 +   * @author bruce
 +   * @since 5.5
 +   */
 +  static class EventSeqnoHolder implements DataSerializable {
 +    private static final long serialVersionUID = 8137262960763308046L;
 +
 +    /** event sequence number.  These  */
 +    long lastSeqno = -1;
 +    
 +    /** millisecond timestamp */
 +    transient long endOfLifeTimer;
 +    
 +    /** whether this entry is being removed */
 +    transient boolean removed;
 +    
 +    /**
 +     * version tag, if any, for the operation
 +     */
 +    VersionTag versionTag;
 +    
 +    // for debugging
 +//    transient Exception context;
 +    
 +    EventSeqnoHolder(long id, VersionTag versionTag) {
 +      this.lastSeqno = id;
 +      this.versionTag = versionTag;
 +    }
 +    
 +    public EventSeqnoHolder() {
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      StringBuilder result = new StringBuilder();
 +      result.append("seqNo").append(this.lastSeqno);
 +      if (this.versionTag != null) {
 +        result.append(",").append(this.versionTag);
 +      }
 +      return result.toString();
 +    }
 +
 +    public void fromData(DataInput in) throws IOException,
 +        ClassNotFoundException {
 +      lastSeqno = in.readLong();
 +      versionTag = (VersionTag)DataSerializer.readObject(in);
 +    }
 +
 +    public void toData(DataOutput out) throws IOException {
 +      out.writeLong(lastSeqno);
 +      DataSerializer.writeObject(versionTag, out);
 +    }
 +  }
 +
 +  /**
 +   * A status tracker for each bulk operation (putAll or removeAll) from originators specified by
 +   * membershipID and threadID in the cache
 +   * processed is true means the bulk op is processed by one thread 
 +   * no need to redo it by other threads.
 +   * @author Gester
 +   * @since 5.7
 +   */
 +  static class BulkOpProcessed {
 +    /** whether the op is processed */
 +    private boolean processed;
 +  
 +    /**
 +     * creates a new instance to save status of a bulk op 
 +     * @param status true if the op has been processed 
 +     */
 +    BulkOpProcessed(boolean status) {
 +      this.processed = status;
 +    }
 +    
 +    /**
 +     * setter method to change the status
 +     * @param status true if the op has been processed 
 +     */
 +    void setStatus(boolean status) {
 +      this.processed = status;
 +    }
 +    
 +    /**
 +     * getter method to peek the current status
 +     * @return current status
 +     */
 +    boolean getStatus() {
 +      return this.processed;
 +    }
 +    
 +    @Override
 +    public String toString() {
 +      return "BULKOP("+this.processed+")";
 +    }
 +  }
 +  
 +  /**
 +   * A holder for the version tags generated for a bulk operation (putAll or removeAll). These
 +   * version tags are retrieved when a bulk op is retried.
 +   * @author Dan
 +   * @since 7.0
 +   * protected for test purposes only.
 +   */
 +  protected static class BulkOpHolder {
 +    /**
 +     * Whether this object was removed by the cleanup thread.
 +     */
 +    public boolean removed;
 +    /**
 +     * public for tests only
 +     */
 +    public Map<EventID, VersionTag> entryVersionTags = new HashMap<EventID, VersionTag>();
 +    /** millisecond timestamp */
 +    transient long endOfLifeTimer;
 +  
 +    /**
 +     * creates a new instance to save status of a putAllOperation 
 +     */
 +    BulkOpHolder() {
 +    }
 +    
 +    public void putVersionTag(EventID eventId, VersionTag versionTag) {
 +      entryVersionTags.put(eventId, versionTag);
 +      this.endOfLifeTimer = 0;
 +    }
 +
 +    
 +    @Override
 +    public String toString() {
 +      return "BulkOpHolder tags=" + this.entryVersionTags;
 +    }
 +  }
 +  
 +  static class ExpiryTask extends SystemTimerTask {
 +    
 +    GemFireCacheImpl cache;
 +    long expiryTime;
 +    List trackers = new LinkedList();
 +    
 +    public ExpiryTask(GemFireCacheImpl cache, long expiryTime) {
 +      this.cache = cache;
 +      this.expiryTime = expiryTime;
 +    }
 +    void addTracker(EventTracker tracker) {
 +      synchronized(trackers) {
 +        trackers.add(tracker);
 +      }
 +    }
 +    void removeTracker(EventTracker tracker) {
 +      synchronized(trackers) {
 +        trackers.remove(tracker);
 +      }
 +    }
 +    int getNumberOfTrackers() {
 +      return trackers.size();
 +    }
 +    @Override
 +    public void run2() {
 +      long now = System.currentTimeMillis();
 +      long timeout = now - expiryTime;
 +      final boolean traceEnabled = logger.isTraceEnabled();
 +      synchronized(trackers) {
 +        for (Iterator it=trackers.iterator(); it.hasNext(); ) {
 +          EventTracker tracker = (EventTracker)it.next();
 +          if (traceEnabled) {
 +            logger.trace("{} sweeper: starting", tracker.name);
 +          }
 +          for (Iterator it2 = tracker.recordedEvents.entrySet().iterator(); it2.hasNext();) {
 +            Map.Entry e = (Map.Entry)it2.next();
 +            EventSeqnoHolder evh = (EventSeqnoHolder)e.getValue();
 +            synchronized(evh) {
 +              if (evh.endOfLifeTimer == 0) {
 +                evh.endOfLifeTimer = now; // a new holder - start the timer 
 +              }
 +              if (evh.endOfLifeTimer <= timeout) {
 +                evh.removed = true;
 +                evh.lastSeqno = -1;
 +                if (traceEnabled) {
 +                  logger.trace("{} sweeper: removing {}", tracker.name, e.getKey());
 +                }
 +                it2.remove();
 +              }
 +            }
 +          }
 +          
 +          //Remove bulk operations we're tracking
 +          for (Iterator<Map.Entry<ThreadIdentifier, BulkOpHolder>> it2 = tracker.recordedBulkOpVersionTags.entrySet().iterator(); it2.hasNext();) {
 +            Map.Entry<ThreadIdentifier, BulkOpHolder> e = it2.next();
 +            BulkOpHolder evh = e.getValue();
 +            synchronized(evh) {
 +              if (evh.endOfLifeTimer == 0) {
 +                evh.endOfLifeTimer = now; // a new holder - start the timer 
 +              }
 +              //Remove the PutAll tracker only if the put all is complete
 +              //and it has expired.
 +              if (evh.endOfLifeTimer <= timeout) {
 +                evh.removed = true;
 +                if (logger.isTraceEnabled()) {
 +                  logger.trace("{} sweeper: removing bulkOp {}", tracker.name, e.getKey());
 +                }
 +                it2.remove();
 +              }
 +            }
 +          }
 +          if (traceEnabled) {
 +            logger.trace("{} sweeper: done", tracker.name);
 +          }
 +        }
 +      }
 +    }
 +    
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
index 201acc0,0000000..201acc0
mode 100644,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
index f4216ac,0000000..de49fea
mode 100644,000000..100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXManagerImpl.java
@@@ -1,1511 -1,0 +1,1511 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package com.gemstone.gemfire.internal.cache;
 +
 +import java.io.DataInput;
 +import java.io.DataOutput;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedHashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Map.Entry;
 +import java.util.Queue;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.ConcurrentLinkedQueue;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 +import java.util.concurrent.locks.LockSupport;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.DataSerializer;
 +import com.gemstone.gemfire.GemFireException;
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.SystemFailure;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.CacheTransactionManager;
 +import com.gemstone.gemfire.cache.CommitConflictException;
 +import com.gemstone.gemfire.cache.TransactionDataRebalancedException;
 +import com.gemstone.gemfire.cache.TransactionId;
 +import com.gemstone.gemfire.cache.TransactionInDoubtException;
 +import com.gemstone.gemfire.cache.TransactionListener;
 +import com.gemstone.gemfire.cache.TransactionWriter;
 +import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
 +import com.gemstone.gemfire.distributed.TXManagerCancelledException;
 +import com.gemstone.gemfire.distributed.internal.DM;
 +import com.gemstone.gemfire.distributed.internal.DistributionManager;
 +import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
 +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 +import com.gemstone.gemfire.distributed.internal.MembershipListener;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.internal.SystemTimer.SystemTimerTask;
 +import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.LogService;
 +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 +import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap;
 +import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap.HashEntry;
 +import com.gemstone.gemfire.internal.util.concurrent.CustomEntryConcurrentHashMap.MapCallback;
 +
 +/** <p>The internal implementation of the {@link CacheTransactionManager}
 + * interface returned by {@link GemFireCacheImpl#getCacheTransactionManager}.
 + * Internal operations 
 +
 + </code>TransactionListener</code> invocation, Region synchronization, transaction statistics and
 +
 + * transaction logging are handled here 
 + * 
 + * @author Mitch Thomas
 + *
 + * @since 4.0
 + * 
 + * @see CacheTransactionManager
 + */
 +public final class TXManagerImpl implements CacheTransactionManager,
 +    MembershipListener {
 +
 +  private static final Logger logger = LogService.getLogger();
 +  
 +  // Thread specific context container
 +  private final ThreadLocal<TXStateProxy> txContext;
 +  private static TXManagerImpl currentInstance = null;
 +  // The unique transaction ID for this Manager 
 +  private final AtomicInteger uniqId;
 +
 +  private final DM dm;
 +  private final Cache cache;
 +
 +  // The DistributionMemberID used to construct TXId's
 +  private final InternalDistributedMember distributionMgrId;
 +
 +  private final CachePerfStats cachePerfStats;
 +
 +  private static final TransactionListener[] EMPTY_LISTENERS =
 +    new TransactionListener[0];
 +
 +  /**
 +   * Default transaction id to indicate no transaction
 +   */
 +  public static final int NOTX = -1;
 +  
 +  private final ArrayList<TransactionListener> txListeners = new ArrayList<TransactionListener>(8);
 +  public TransactionWriter writer = null;
 +  private boolean closed = false;
 +
 +  private final Map<TXId, TXStateProxy> hostedTXStates;
 +
 +  /**
 +   * the number of client initiated transactions to store for client failover
 +   */
 +  public final static int FAILOVER_TX_MAP_SIZE = Integer.getInteger("gemfire.transactionFailoverMapSize", 1000);
 +  
 +  /**
 +   * used to store TXCommitMessages for client initiated transactions, so that when a client failsover,
 +   * (after the delegate dies) the commit message can be sent to client.
 +   * //TODO we really need to keep around only one msg for each thread on a client
 +   */
 +  @SuppressWarnings("unchecked")
 +  private Map<TXId ,TXCommitMessage> failoverMap = Collections.synchronizedMap(new LinkedHashMap<TXId, TXCommitMessage>() {
 +    private static final long serialVersionUID = -4156018226167594134L;
 +
 +    protected boolean removeEldestEntry(Entry eldest) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("TX: removing client initiated transaction from failover map:{} :{}", eldest.getKey(), (size()>FAILOVER_TX_MAP_SIZE));
 +      }
 +      return size() > FAILOVER_TX_MAP_SIZE;
 +    };
 +  });
 +
 +  /**
 +   * A flag to allow persistent transactions. public for testing.
 +   */
 +  public static boolean ALLOW_PERSISTENT_TRANSACTIONS = Boolean.getBoolean("gemfire.ALLOW_PERSISTENT_TRANSACTIONS");
 +
 +  /**
 +   * this keeps track of all the transactions that were initiated locally.
 +   */
 +  private ConcurrentMap<TXId, TXStateProxy> localTxMap = new ConcurrentHashMap<TXId, TXStateProxy>();
 +
 +  /**
 +   * the time in minutes after which any suspended transaction are rolled back. default is 30 minutes
 +   */
 +  private volatile long suspendedTXTimeout = Long.getLong("gemfire.suspendedTxTimeout", 30);
 +  
 +  /**
 +   * Thread-specific flag to indicate whether the transactions managed by this
 +   * CacheTransactionManager for this thread should be distributed
 +   */
 +  private final ThreadLocal<Boolean> isTXDistributed;
 +  
 +
 +  /** Constructor that implements the {@link CacheTransactionManager}
 +   * interface. Only only one instance per {@link com.gemstone.gemfire.cache.Cache}
 +   *
 +   * @param cachePerfStats 
 +   */
 +  public TXManagerImpl(
 +                       CachePerfStats cachePerfStats,
 +                       Cache cache) {
 +    this.cache = cache;
 +    this.dm = ((InternalDistributedSystem)cache.getDistributedSystem())
 +        .getDistributionManager();
 +    this.distributionMgrId = this.dm.getDistributionManagerId();
 +    this.uniqId = new AtomicInteger(0);
 +    this.cachePerfStats = cachePerfStats;
 +    this.hostedTXStates = new HashMap<TXId, TXStateProxy>();
 +    this.txContext = new ThreadLocal<TXStateProxy>();
 +    this.isTXDistributed = new ThreadLocal<Boolean>();
 +    currentInstance = this;
 +  }
 +
 +  final Cache getCache() {
 +    return this.cache;
 +  }
 +  
 +  
 +  /**
 +   * Get the TransactionWriter for the cache
 +   * 
 +   * @return the current TransactionWriter
 +   * @see TransactionWriter
 +   */
 +  public final TransactionWriter getWriter() {
 +    return writer;
 +  }
 +  
 +
 +  public final void setWriter(TransactionWriter writer) {
 +    if (((GemFireCacheImpl)this.cache).isClient()) {
 +      throw new IllegalStateException(LocalizedStrings.TXManager_NO_WRITER_ON_CLIENT.toLocalizedString());
 +    }
 +    this.writer = writer;
 +  }
 +  
 +
 +  public final TransactionListener getListener() {
 +    synchronized (this.txListeners) {
 +      if (this.txListeners.isEmpty()) {
 +      return null;
 +      } else if (this.txListeners.size() == 1) {
 +        return this.txListeners.get(0);
 +      } else {
 +        throw new IllegalStateException(LocalizedStrings.TXManagerImpl_MORE_THAN_ONE_TRANSACTION_LISTENER_EXISTS.toLocalizedString());
 +      }
 +    }
 +  }
 +  
 +  public TransactionListener[] getListeners() {
 +    synchronized (this.txListeners) {
 +      int size = this.txListeners.size();
 +      if (size == 0) {
 +        return EMPTY_LISTENERS;
 +      } else {
 +        TransactionListener[] result = new TransactionListener[size];
 +        this.txListeners.toArray(result);
 +        return result;
 +      }
 +    }
 +  }
 +  
 +  public TransactionListener setListener(TransactionListener newListener) {
 +    synchronized (this.txListeners) {
 +      TransactionListener result = getListener();
 +      this.txListeners.clear();
 +      if (newListener != null) {
 +        this.txListeners.add(newListener);
 +      }
 +      if (result != null) {
 +        closeListener(result);
 +      }
 +      return result;
 +    }
 +  }
 +  public void addListener(TransactionListener aListener) {
 +    if (aListener == null) {
 +      throw new IllegalArgumentException(LocalizedStrings.TXManagerImpl_ADDLISTENER_PARAMETER_WAS_NULL.toLocalizedString());
 +    }
 +    synchronized (this.txListeners) {
 +      if (!this.txListeners.contains(aListener)) {
 +        this.txListeners.add(aListener);
 +      }
 +    }
 +  }
 +  public void removeListener(TransactionListener aListener) {
 +    if (aListener == null) {
 +      throw new IllegalArgumentException(LocalizedStrings.TXManagerImpl_REMOVELISTENER_PARAMETER_WAS_NULL.toLocalizedString());
 +    }
 +    synchronized (this.txListeners) {
 +      if (this.txListeners.remove(aListener)) {
 +        closeListener(aListener);
 +      }
 +    }
 +  }
 +  public void initListeners(TransactionListener[] newListeners) {
 +    synchronized (this.txListeners) {
 +      if (!this.txListeners.isEmpty()) {
 +        Iterator<TransactionListener> it = this.txListeners.iterator();
 +        while (it.hasNext()) {
 +          closeListener(it.next());
 +        }
 +        this.txListeners.clear();
 +      }
 +      if (newListeners != null && newListeners.length > 0) {
 +        List<TransactionListener> nl = Arrays.asList(newListeners);
 +        if (nl.contains(null)) {
 +          throw new IllegalArgumentException(LocalizedStrings.TXManagerImpl_INITLISTENERS_PARAMETER_HAD_A_NULL_ELEMENT.toLocalizedString());
 +        }
 +        this.txListeners.addAll(nl);
 +      }
 +    }
 +  }
 +
 +  final CachePerfStats getCachePerfStats() {
 +    return this.cachePerfStats;
 +  }
 +
 +  /** Build a new {@link TXId}, use it as part of the transaction
 +   * state and associate with the current thread using a {@link
 +   * ThreadLocal}.
 +   */
 +  public void begin() {
 +    checkClosed();
 +    {
 +      TransactionId tid = getTransactionId();
 +      if (tid != null) {
 +        throw new java.lang.IllegalStateException(LocalizedStrings.TXManagerImpl_TRANSACTION_0_ALREADY_IN_PROGRESS.toLocalizedString(tid));
 +      }
 +    }
 +    TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
 +    TXStateProxyImpl proxy = null;
 +    if (isDistributed()) {
 +      proxy = new DistTXStateProxyImplOnCoordinator(this, id, null);  
 +    } else {
 +      proxy = new TXStateProxyImpl(this, id, null);  
 +    }
 +    setTXState(proxy);
 +    this.localTxMap.put(id, proxy);
 +  }
 +
 +
 +  /** Build a new {@link TXId}, use it as part of the transaction
 +   * state and associate with the current thread using a {@link
 +   * ThreadLocal}. Flag the transaction to be enlisted with a JTA
 +   * Transaction.  Should only be called in a context where we know
 +   * there is no existing transaction.
 +   */
 +  public TXStateProxy beginJTA() {
 +    checkClosed();
 +    TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
 +    TXStateProxy newState = null;
 +    
 +    if (isDistributed()) {
 +      newState = new DistTXStateProxyImplOnCoordinator(this, id, true);
 +    } else {
 +      newState = new TXStateProxyImpl(this, id, true);
 +    }
 +    setTXState(newState);
 +    return newState;
 +  }
 +
 +  /*
 +   * Only applicable for Distributed transaction.
 +   */
 +  public void precommit() throws CommitConflictException {
 +    checkClosed();
 +
 +    final TXStateProxy tx = getTXState();
 +    if (tx == null) {
 +      throw new IllegalStateException(LocalizedStrings.TXManagerImpl_THREAD_DOES_NOT_HAVE_AN_ACTIVE_TRANSACTION.toLocalizedString());
 +    }
 +    
 +    tx.checkJTA(LocalizedStrings.TXManagerImpl_CAN_NOT_COMMIT_THIS_TRANSACTION_BECAUSE_IT_IS_ENLISTED_WITH_A_JTA_TRANSACTION_USE_THE_JTA_MANAGER_TO_PERFORM_THE_COMMIT.toLocalizedString());
 +  
 +    tx.precommit();
 +  }
 +  
 +  /** Complete the transaction associated with the current
 +   *  thread. When this method completes, the thread is no longer
 +   *  associated with a transaction.
 +   *
 +   */
 +  public void commit() throws CommitConflictException {
 +    checkClosed();
 +
 +    final TXStateProxy tx = getTXState();
 +    if (tx == null) {
 +      throw new IllegalStateException(LocalizedStrings.TXManagerImpl_THREAD_DOES_NOT_HAVE_AN_ACTIVE_TRANSACTION.toLocalizedString());
 +    }
 +
 +    tx.checkJTA(LocalizedStrings.TXManagerImpl_CAN_NOT_COMMIT_THIS_TRANSACTION_BECAUSE_IT_IS_ENLISTED_WITH_A_JTA_TRANSACTION_USE_THE_JTA_MANAGER_TO_PERFORM_THE_COMMIT.toLocalizedString());
 +
 +    final long opStart = CachePerfStats.getStatTime();
 +    final long lifeTime = opStart - tx.getBeginTime();
 +    try {
 +      setTXState(null);
 +      tx.commit();
 +    } catch (CommitConflictException ex) {
 +      saveTXStateForClientFailover(tx, TXCommitMessage.CMT_CONFLICT_MSG); //fixes #43350
 +      noteCommitFailure(opStart, lifeTime, tx);
 +      cleanup(tx.getTransactionId()); // fixes #52086
 +      throw ex;
 +    } catch (TransactionDataRebalancedException reb) {
 +      saveTXStateForClientFailover(tx, TXCommitMessage.REBALANCE_MSG);
 +      cleanup(tx.getTransactionId()); // fixes #52086
 +      throw reb;
 +    } catch (UnsupportedOperationInTransactionException e) {
 +      // fix for #42490
 +      setTXState(tx);
 +      throw e;
 +    } catch (RuntimeException e) {
 +      saveTXStateForClientFailover(tx, TXCommitMessage.EXCEPTION_MSG);
 +      cleanup(tx.getTransactionId()); // fixes #52086
 +      throw e;
 +    }
 +    saveTXStateForClientFailover(tx);
 +    cleanup(tx.getTransactionId());
 +    noteCommitSuccess(opStart, lifeTime, tx);
 +  }
 +
 +  final void noteCommitFailure(long opStart, long lifeTime, TXStateInterface tx) {
 +    long opEnd = CachePerfStats.getStatTime();
 +    this.cachePerfStats.txFailure(opEnd - opStart,
 +                                  lifeTime, tx.getChanges());
 +    TransactionListener[] listeners = getListeners();
 +    if (tx.isFireCallbacks() && listeners.length > 0) {
 +      final TXEvent e = tx.getEvent();
 +      try {
 +      for (int i=0; i < listeners.length; i++) {
 +        try {
 +          listeners[i].afterFailedCommit(e);
 +        } 
 +        catch (VirtualMachineError err) {
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error.  We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        catch (Throwable t) {
 +          // Whenever you catch Error or Throwable, you must also
 +          // catch VirtualMachineError (see above).  However, there is
 +          // _still_ a possibility that you are dealing with a cascading
 +          // error condition, so you also need to check to see if the JVM
 +          // is still usable:
 +          SystemFailure.checkFailure();
 +          logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
 +        }
 +      }
 +      } finally {
 +        e.release();
 +      }
 +    }
 +  }
 +
 +  final void noteCommitSuccess(long opStart, long lifeTime, TXStateInterface tx) {
 +    long opEnd = CachePerfStats.getStatTime();
 +    this.cachePerfStats.txSuccess(opEnd - opStart,
 +                                  lifeTime, tx.getChanges());
 +    TransactionListener[] listeners = getListeners();
 +    if (tx.isFireCallbacks() && listeners.length > 0) {
 +      final TXEvent e = tx.getEvent();
 +      try {
 +      for (final TransactionListener listener : listeners) {
 +        try {
 +          listener.afterCommit(e);
 +        } 
 +        catch (VirtualMachineError err) {
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error.  We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        catch (Throwable t) {
 +          // Whenever you catch Error or Throwable, you must also
 +          // catch VirtualMachineError (see above).  However, there is
 +          // _still_ a possibility that you are dealing with a cascading
 +          // error condition, so you also need to check to see if the JVM
 +          // is still usable:
 +          SystemFailure.checkFailure();
 +          logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
 +        }
 +      }
 +      } finally {
 +        e.release();
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * prepare for transaction replay by assigning a new tx id to the current proxy
 +   */
 +  private void _incrementTXUniqueIDForReplay() {
 +    TXStateProxyImpl tx = (TXStateProxyImpl)getTXState();
 +    assert tx != null : "expected a transaction to be in progress";
 +    TXId id = new TXId(this.distributionMgrId, this.uniqId.incrementAndGet());
 +    tx.setTXIDForReplay(id);
 +  }
 +
 +
 +  /** Roll back the transaction associated with the current
 +   *  thread. When this method completes, the thread is no longer
 +   *  associated with a transaction.
 +   */
 +  public void rollback() {
 +    checkClosed();
 +    TXStateProxy tx = getTXState();
 +    if (tx == null) {
 +      throw new IllegalStateException(LocalizedStrings.TXManagerImpl_THREAD_DOES_NOT_HAVE_AN_ACTIVE_TRANSACTION.toLocalizedString());
 +    }
 +
 +    tx.checkJTA(LocalizedStrings.TXManagerImpl_CAN_NOT_ROLLBACK_THIS_TRANSACTION_IS_ENLISTED_WITH_A_JTA_TRANSACTION_USE_THE_JTA_MANAGER_TO_PERFORM_THE_ROLLBACK.toLocalizedString());
 +
 +    final long opStart = CachePerfStats.getStatTime();
 +    final long lifeTime = opStart - tx.getBeginTime();
 +    setTXState(null);
 +    tx.rollback();
 +    saveTXStateForClientFailover(tx);
 +    cleanup(tx.getTransactionId());
 +    noteRollbackSuccess(opStart, lifeTime, tx);
 +  }
 +  
 +  final void noteRollbackSuccess(long opStart, long lifeTime, TXStateInterface tx) {
 +    long opEnd = CachePerfStats.getStatTime();
 +    this.cachePerfStats.txRollback(opEnd - opStart,
 +                                   lifeTime, tx.getChanges());
 +    TransactionListener[] listeners = getListeners();
 +    if (tx.isFireCallbacks() && listeners.length > 0) {
 +      final TXEvent e = tx.getEvent();
 +      try {
 +      for (int i = 0; i < listeners.length; i++) {
 +        try {
 +          listeners[i].afterRollback(e);
 +        } 
 +        catch (VirtualMachineError err) {
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error.  We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        catch (Throwable t) {
 +          // Whenever you catch Error or Throwable, you must also
 +          // catch VirtualMachineError (see above).  However, there is
 +          // _still_ a possibility that you are dealing with a cascading
 +          // error condition, so you also need to check to see if the JVM
 +          // is still usable:
 +          SystemFailure.checkFailure();
 +          logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
 +        }
 +      }
 +      } finally {
 +        e.release();
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Called from Commit and Rollback to unblock waiting threads
 +   */
 +  private void cleanup(TransactionId txId) {
 +    TXStateProxy proxy = this.localTxMap.remove(txId);
 +    if (proxy != null) {
 +      proxy.close();
 +    }
 +    Queue<Thread> waitingThreads = this.waitMap.get(txId);
 +    if (waitingThreads != null && !waitingThreads.isEmpty()) {
 +      for (Thread waitingThread : waitingThreads) {
 +        LockSupport.unpark(waitingThread);
 +      }
 +      waitMap.remove(txId);
 +    }
 +  }
 +  
 +  /** Reports the existance of a Transaction for this thread
 +   *
 +   */
 +  public boolean exists() {
 +    return null != getTXState();
 +  }
 +
 +  /** Gets the current transaction identifier or null if no transaction exists
 +   *
 +   */
 +  public TransactionId getTransactionId() {
 +    TXStateProxy t = getTXState();
 +    TransactionId ret = null;
 +    if (t!=null) {
 +      ret = t.getTransactionId();
 +    } 
 +    return ret;
 +  }
 +
 +  /** 
 +   * Returns the TXStateProxyInterface of the current thread; null if no transaction.
 +   */
 +  public final TXStateProxy getTXState() {
 +    TXStateProxy tsp = txContext.get();
 +    if (tsp != null && !tsp.isInProgress()) {
 +      this.txContext.set(null);
 +      tsp = null;
 +    }
 +    return tsp;
 +  }
 +
 +  /**
 +   * sets {@link TXStateProxy#setInProgress(boolean)} when a txContext is present.
 +   * This method must only be used in fail-over scenarios.
 +   * @param progress value of the progress flag to be set
 +   * @return the previous value of inProgress flag
 +   * @see TXStateProxy#setInProgress(boolean)
 +   */
 +  public boolean setInProgress(boolean progress) {
 +    boolean retVal = false;
 +    TXStateProxy tsp = txContext.get();
 +    if (tsp != null) {
 +      retVal = tsp.isInProgress();
 +      tsp.setInProgress(progress);
 +    }
 +    return retVal;
 +  }
 +
 +  public final void setTXState(TXStateProxy val) {
 +    txContext.set(val);
 +  }
 +
 +
 +  public void close() {
 +    if (isClosed()) {
 +      return;
 +    }
 +    this.closed = true;
 +    for (TXStateProxy proxy: this.hostedTXStates.values()) {
 +      proxy.close();
 +    }
 +    for (TXStateProxy proxy: this.localTxMap.values()) {
 +      proxy.close();
 +    }
 +    {
 +      TransactionListener[] listeners = getListeners();
 +      for (int i=0; i < listeners.length; i++) {
 +        closeListener(listeners[i]);
 +      }
 +    }
 +  }
 +  private void closeListener(TransactionListener tl) {
 +    try {
 +      tl.close();
 +    } 
 +    catch (VirtualMachineError err) {
 +      SystemFailure.initiateFailure(err);
 +      // If this ever returns, rethrow the error.  We're poisoned
 +      // now, so don't let this thread continue.
 +      throw err;
 +    }
 +    catch (Throwable t) {
 +      // Whenever you catch Error or Throwable, you must also
 +      // catch VirtualMachineError (see above).  However, there is
 +      // _still_ a possibility that you are dealing with a cascading
 +      // error condition, so you also need to check to see if the JVM
 +      // is still usable:
 +      SystemFailure.checkFailure();
 +      logger.error(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_OCCURRED_IN_TRANSACTIONLISTENER), t);
 +    }
 +  }
 +  /**
 +   * If the current thread is in a transaction then suspend will
 +   * cause it to no longer be in a transaction.
 +   * @return the state of the transaction or null. Pass this value
 +   *  to {@link TXManagerImpl#resume} to reactivate the suspended transaction.
 +   */
 +  public final TXStateProxy internalSuspend() {
 +    TXStateProxy result = getTXState();
 +    if (result != null) {
 +      result.suspend();
 +      setTXState(null);
 +    }
 +    return result;
 +  }
 +  /**
 +   * Activates the specified transaction on the calling thread.
 +   * @param tx the transaction to activate.
 +   * @throws IllegalStateException if this thread already has an active transaction
 +   */
 +  public final void resume(TXStateProxy tx) {
 +    if (tx != null) {
 +      TransactionId tid = getTransactionId();
 +      if (tid != null) {
 +        throw new java.lang.IllegalStateException(LocalizedStrings.TXManagerImpl_TRANSACTION_0_ALREADY_IN_PROGRESS.toLocalizedString(tid));
 +      }
 +      if (tx instanceof TXState) {
 +        throw new java.lang.IllegalStateException("Found instance of TXState: " + tx);
 +      }
 +      setTXState(tx);
 +      tx.resume();
 +      SystemTimerTask task = this.expiryTasks.remove(tx.getTransactionId());
 +      if (task != null) {
 +        task.cancel();
 +      }
 +    }
 +  }
 +
 +  private final boolean isClosed() {
 +    return this.closed;
 +  }
 +  private final void checkClosed() {
 +    cache.getCancelCriterion().checkCancelInProgress(null);
 +    if (this.closed) {
 +      throw new TXManagerCancelledException("This transaction manager is closed.");
 +    }
 +  }
 +
 +  final DM getDM() {
 +    return this.dm;
 +  }
 +
 +  
 +  public static int getCurrentTXUniqueId() {
 +    if(currentInstance==null) {
 +      return NOTX;
 +    }
 +    return currentInstance.getMyTXUniqueId();
 +  }
 +  
 +  
 +  
 +  
 +  public final static TXStateProxy getCurrentTXState() {
 +    if(currentInstance==null) {
 +      return null;
 +    }
 +    return currentInstance.getTXState();
 +  }
 +  
 +  public static void incrementTXUniqueIDForReplay() {
 +    if(currentInstance != null) {
 +      currentInstance._incrementTXUniqueIDForReplay();
 +    }
 +  }
 +  
 +  public int getMyTXUniqueId() {
 +    TXStateProxy t = txContext.get();
 +    if (t != null) {
 +      return t.getTxId().getUniqId();
 +    } else {
 +      return NOTX;
 +    }
 +  }
 +
 +  /**
 +   * Associate the remote txState with the thread processing this message. Also,
 +   * we acquire a lock on the txState, on which this thread operates.
 +   * Some messages like SizeMessage should not create a new txState.
 +   * @param msg
 +   * @return {@link TXStateProxy} the txProxy for the transactional message
 +   * @throws InterruptedException 
 +   */
 +  public TXStateProxy masqueradeAs(TransactionMessage msg) throws InterruptedException {
 +    if (msg.getTXUniqId() == NOTX || !msg.canParticipateInTransaction()) {
 +      return null;
 +    }
 +    TXId key = new TXId(msg.getMemberToMasqueradeAs(), msg.getTXUniqId());
 +    TXStateProxy val;
 +    val = this.hostedTXStates.get(key);
 +    if (val == null) {
 +      synchronized(this.hostedTXStates) {
 +        val = this.hostedTXStates.get(key);
 +        if (val == null && msg.canStartRemoteTransaction()) {
 +          if (msg.isTransactionDistributed()) {
 +            val = new DistTXStateProxyImplOnDatanode(this, key, msg.getTXOriginatorClient());
 +            val.setLocalTXState(new DistTXState(val,true));
 +          } else {
 +            val = new TXStateProxyImpl(this, key, msg.getTXOriginatorClient());
 +            val.setLocalTXState(new TXState(val,true));
 +          }
 +          this.hostedTXStates.put(key, val);
 +        }
 +      }
 +    }
 +    if (val != null) {
 +      if (!val.getLock().isHeldByCurrentThread()) {
 +        val.getLock().lock();
 +      }
 +    }
 +
 +    setTXState(val);
 +    return val;
 +  }
 +  
 +  /**
 +   * Associate the remote txState with the thread processing this message. Also,
 +   * we acquire a lock on the txState, on which this thread operates.
 +   * Some messages like SizeMessage should not create a new txState.
 +   * @param msg
 +   * @param memberId
 +   * @param probeOnly - do not masquerade; just look up the TX state
 +   * @return {@link TXStateProxy} the txProxy for the transactional message
 +   * @throws InterruptedException 
 +   */
 +  public TXStateProxy masqueradeAs(Message msg,InternalDistributedMember memberId, boolean probeOnly) throws InterruptedException {
 +    if (msg.getTransactionId() == NOTX) {
 +      return null;
 +    }
 +    TXId key = new TXId(memberId, msg.getTransactionId());
 +    TXStateProxy val;
 +    val = this.hostedTXStates.get(key);
 +    if (val == null) {
 +      synchronized(this.hostedTXStates) {
 +        val = this.hostedTXStates.get(key);
-         if (val == null && msg.canStartRemoteTransaction()) {
++        if (val == null) {
 +          // [sjigyasu] TODO: Conditionally create object based on distributed or non-distributed tx mode 
 +          if (msg instanceof TransactionMessage && ((TransactionMessage)msg).isTransactionDistributed()) {
 +            val = new DistTXStateProxyImplOnDatanode(this, key, memberId);
 +            //val.setLocalTXState(new DistTXState(val,true));
 +          } else {
 +            val = new TXStateProxyImpl(this, key, memberId);
 +            //val.setLocalTXState(new TXState(val,true));
 +          }
 +          this.hostedTXStates.put(key, val);
 +        }
 +      }
 +    }
 +    if (!probeOnly) {
 +      if (val != null) {
 +        if (!val.getLock().isHeldByCurrentThread()) {
 +          val.getLock().lock();
 +          // add the TXStateProxy back to the map
 +          // in-case another thread removed it while we were waiting to lock.
 +          // This can happen during client transaction failover.
 +          synchronized (this.hostedTXStates) {
 +            this.hostedTXStates.put(key, val);
 +          }
 +        }
 +      }
 +      setTXState(val);
 +    }
 +    return val;
 +  }
 +  
 +  
 +  
 +  
 +  /**
 +   * Associate the transactional state with this thread.
 +   * @param txState the transactional state.
 +   */
 +  public void masqueradeAs(TXStateProxy txState) {
 +    assert txState != null;
 +    if (!txState.getLock().isHeldByCurrentThread()) {
 +      txState.getLock().lock();
 +    }
 +    setTXState(txState);
 +  }
 +
 +  /**
 +   * Remove the association created by {@link #masqueradeAs(TransactionMessage)}
 +   * @param tx
 +   */
 +  public void unmasquerade(TXStateProxy tx) {
 +    if (tx != null) {
 +      setTXState(null);
 +      tx.getLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * Cleanup the remote txState after commit and rollback
 +   * @param txId  
 +   * @return the TXStateProxy
 +   */
 +  public TXStateProxy removeHostedTXState(TXId txId) {
 +    synchronized (this.hostedTXStates) {
 +      TXStateProxy result = this.hostedTXStates.remove(txId);
 +      if (result != null) {
 +        result.close();
 +      }
 +      return result;
 +    }
 +  }
 +  
 +  /**
 +   * Called when the CacheServer is shutdown.
 +   * Removes txStates hosted on client's behalf
 +   */
 +  protected void removeHostedTXStatesForClients() {
 +    synchronized (this.hostedTXStates) {
 +      Iterator<Entry<TXId, TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
 +      while (iterator.hasNext()) {
 +        Entry<TXId, TXStateProxy> entry = iterator.next();
 +        if (entry.getValue().isOnBehalfOfClient()) {
 +          entry.getValue().close();
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("Cleaning up TXStateProxy for {}", entry.getKey());
 +          }
 +          iterator.remove();
 +        }
 +      }
 +    }
 +  }
 +  
 +  /**
 +   * Used to verify if a transaction with a given id is hosted by this txManager.
 +   * @param txId
 +   * @return true if the transaction is in progress, false otherwise
 +   */
 +  public boolean isHostedTxInProgress(TXId txId) {
 +    synchronized (this.hostedTXStates) {
 +      TXStateProxy tx = this.hostedTXStates.get(txId);
 +      if (tx == null) {
 +        return false;
 +      }
 +      return tx.isRealDealLocal();
 +    }
 +  }
 +
 +  public TXStateProxy getHostedTXState(TXId txId) {
 +    synchronized (this.hostedTXStates) {
 +      return this.hostedTXStates.get(txId);
 +    }
 +  }
 +  
 +  /**
 +   * @return number of transaction in progress on behalf of remote nodes
 +   */
 +  public int hostedTransactionsInProgressForTest() {
 +    synchronized (this.hostedTXStates) {
 +      return this.hostedTXStates.size();
 +    }
 +  }
 +  public int localTransactionsInProgressForTest() {
 +    return this.localTxMap.size();
 +  }
 +
 +  public void memberDeparted(InternalDistributedMember id, boolean crashed) {
 +    synchronized (this.hostedTXStates) {
 +      Iterator<Map.Entry<TXId,TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
 +      while (iterator.hasNext()) {
 +        Map.Entry<TXId,TXStateProxy> me = iterator.next();
 +        TXId txId = me.getKey();
 +        if (txId.getMemberId().equals(id)) {
 +          me.getValue().close();
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("Received memberDeparted, cleaning up txState:{}", txId);
 +          }
 +          iterator.remove();
 +        }
 +      }
 +    }
 +  }
 +
 +  public void memberJoined(InternalDistributedMember id) {
 +  }
 +
 +  public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {
 +  }
 +
 +  public void memberSuspect(InternalDistributedMember id,
 +      InternalDistributedMember whoSuspected, String reason) {
 +  }
 +  
 +
 +  /**
 +   * retrieve the transaction states for the given client
 +   * @param id the client's membership ID
 +   * @return a set of the currently open transaction states
 +   */
 +  public Set<TXId> getTransactionsForClient(InternalDistributedMember id) {
 +    Set<TXId> result = new HashSet<TXId>();
 +    synchronized (this.hostedTXStates) {
 +      for (Map.Entry<TXId, TXStateProxy> entry: this.hostedTXStates.entrySet()) {
 +        if (entry.getKey().getMemberId().equals(id)) {
 +          result.add(entry.getKey());
 +        }
 +      }
 +    }
 +    return result;
 +  }
 +
 +  /** remove the given TXStates */
 +  public void removeTransactions(Set<TXId> txIds, boolean distribute) {
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("expiring the following transactions: {}", txIds);
 +    }
 +    synchronized (this.hostedTXStates) {
 +      Iterator<Map.Entry<TXId, TXStateProxy>> iterator = this.hostedTXStates.entrySet().iterator();
 +      while (iterator.hasNext()) {
 +        Map.Entry<TXId,TXStateProxy> entry = iterator.next();
 +        if (txIds.contains(entry.getKey())) {
 +          entry.getValue().close();
 +          iterator.remove();
 +        }
 +      }
 +    }
 +    if (distribute) {
 +      // tell other VMs to also remove the transactions
 +      TXRemovalMessage.send(this.dm, this.dm.getOtherDistributionManagerIds(), txIds);
 +    }
 +  }
 +
 +  private void saveTXStateForClientFailover(TXStateProxy tx) {
 +    if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
 +      failoverMap.put(tx.getTxId(), tx.getCommitMessage());
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
 +            tx.getTxId(), failoverMap.size());
 +      }
 +    }
 +  }
 +
 +  private void saveTXStateForClientFailover(TXStateProxy tx, TXCommitMessage msg) {
 +    if (tx.isOnBehalfOfClient() && tx.isRealDealLocal()) {
 +      failoverMap.put(tx.getTxId(), msg);
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("TX: storing client initiated transaction:{}; now there are {} entries in the failoverMap",
 +            tx.getTxId(), failoverMap.size());
 +      }
 +    }
 +  }
 +
 +  public void saveTXCommitMessageForClientFailover(TXId txId, TXCommitMessage msg) {
 +    failoverMap.put(txId, msg);
 +  }
 +  
 +  public boolean isHostedTxRecentlyCompleted(TXId txId) {
 +    // if someone is asking to see if we have the txId, they will come
 +    // back and ask for the commit message, this could take a long time
 +    // specially when called from TXFailoverCommand, so we move
 +    // the txId to the front of the queue
 +    TXCommitMessage msg = failoverMap.remove(txId);
 +    if (msg != null) {
 +      failoverMap.put(txId, msg);
 +      return true;
 +    }
 +    return false;
 +  }
 +  
 +  
 +  /**
 +   * If the given transaction is already being completed by another thread
 +   * this will wait for that completion to finish and will ensure that
 +   * the result is saved in the client failover map.
 +   * @param txId
 +   * @return true if a wait was performed
 +   */
 +  public boolean waitForCompletingTransaction(TXId txId) {
 +    TXStateProxy val;
 +    val = this.hostedTXStates.get(txId);
 +    if (val == null) {
 +      synchronized(this.hostedTXStates) {
 +        val = this.hostedTXStates.get(txId);
 +      }
 +    }
 +    if (val != null && val.isRealDealLocal()) {
 +      TXStateProxyImpl impl = (TXStateProxyImpl)val;
 +      TXState state = impl.getLocalRealDeal();
 +      if (state.waitForPreviousCompletion()) {
 +        // the thread we were waiting for would have put a TXCommitMessage
 +        // in the failover map, doing so here may replace an existing token
 +        // like TXCommitMessage.REBALANCE_MSG with null. fixes bug 42661
 +        //saveTXStateForClientFailover(impl);
 +        return true;
 +      }
 +    }
 +    return false;
 +  }
 +  
 +  /**
 +   * Returns the TXCommitMessage for a transaction that has been
 +   * successfully completed.
 +   * @param txId
 +   * @return the commit message or an exception token e.g 
 +   * {@link TXCommitMessage#CMT_CONFLICT_MSG} if the transaction
 +   * threw an exception
 +   * @see #isExceptionToken(TXCommitMessage)
 +   */
 +  public TXCommitMessage getRecentlyCompletedMessage(TXId txId) {
 +    return failoverMap.get(txId);
 +  }
 +
 +  /**
 +   * @param msg
 +   * @return true if msg is an exception token, false otherwise
 +   */
 +  public boolean isExceptionToken(TXCommitMessage msg) {
 +    if (msg == TXCommitMessage.CMT_CONFLICT_MSG
 +        || msg == TXCommitMessage.REBALANCE_MSG
 +        || msg == TXCommitMessage.EXCEPTION_MSG) {
 +      return true;
 +    }
 +    return false;
 +  }
 +  
 +  /**
 +   * Generates exception messages for the three TXCommitMessage tokens that represent
 +   * exceptions during transaction execution. 
 +   * @param msg the token that represents the exception
 +   * @param txId
 +   * @return the exception
 +   */
 +  public RuntimeException getExceptionForToken(TXCommitMessage msg, TXId txId) {
 +    if (msg == TXCommitMessage.CMT_CONFLICT_MSG) {
 +      return new CommitConflictException(LocalizedStrings.
 +            TXState_CONFLICT_DETECTED_IN_GEMFIRE_TRANSACTION_0.toLocalizedString(txId));
 +    }
 +    if (msg == TXCommitMessage.REBALANCE_MSG) {
 +      return new TransactionDataRebalancedException(LocalizedStrings.
 +          PartitionedRegion_TRANSACTIONAL_DATA_MOVED_DUE_TO_REBALANCING.toLocalizedString());
 +    }
 +    if (msg == TXCommitMessage.EXCEPTION_MSG) {
 +      return new TransactionInDoubtException(LocalizedStrings.
 +          ClientTXStateStub_COMMIT_FAILED_ON_SERVER.toLocalizedString());
 +    }
 +    throw new InternalGemFireError("the parameter TXCommitMessage is not an exception token");
 +  }
 +  
 +  public static class TXRemovalMessage extends HighPriorityDistributionMessage {
 +
 +    Set<TXId> txIds;
 +
 +    /** for deserialization */
 +    public TXRemovalMessage() {
 +    }
 +
 +    static void send(DM dm, Set recipients, Set<TXId> txIds) {
 +      TXRemovalMessage msg = new TXRemovalMessage();
 +      msg.txIds = txIds;
 +      msg.setRecipients(recipients);
 +      dm.putOutgoing(msg);
 +    }
 +    
 +    @Override
 +    public void toData(DataOutput out) throws IOException {
 +      DataSerializer.writeHashSet((HashSet<TXId>)this.txIds, out);
 +    }
 +    
 +    @Override
 +    public void fromData(DataInput in) throws IOException, ClassNotFoundException {
 +      this.txIds = DataSerializer.readHashSet(in);
 +    }
 +
 +    public int getDSFID() {
 +      return TX_MANAGER_REMOVE_TRANSACTIONS;
 +    }
 +
 +    @Override
 +    protected void process(DistributionManager dm) {
 +      GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
 +      if (cache != null) {
 +        TXManagerImpl mgr = cache.getTXMgr();
 +        mgr.removeTransactions(this.txIds, false);
 +      }
 +    }
 +    
 +  }
 +
 +  private ConcurrentMap<TransactionId, TXStateProxy> suspendedTXs = new ConcurrentHashMap<TransactionId, TXStateProxy>();
 +  
 +  public TransactionId suspend() {
 +    return suspend(TimeUnit.MINUTES);
 +  }
 +  
 +  TransactionId suspend(TimeUnit expiryTimeUnit) {
 +    TXStateProxy result = getTXState();
 +    if (result != null) {
 +      TransactionId txId = result.getTransactionId();
 +      internalSuspend();
 +      this.suspendedTXs.put(txId, result);
 +      // wake up waiting threads
 +      Queue<Thread> waitingThreads = this.waitMap.get(txId);
 +      if (waitingThreads != null) {
 +        Thread waitingThread = null;
 +        while (true) {
 +          waitingThread = waitingThreads.poll();
 +          if (waitingThread == null
 +              || !Thread.currentThread().equals(waitingThread)) {
 +            break;
 +          }
 +        }
 +        if (waitingThread != null) {
 +          LockSupport.unpark(waitingThread);
 +        }
 +      }
 +      scheduleExpiry(txId, expiryTimeUnit);
 +      return txId;
 +    }
 +    return null;
 +  }
 +
 +  public void resume(TransactionId transactionId) {
 +    if (transactionId == null) {
 +      throw new IllegalStateException(
 +          LocalizedStrings.TXManagerImpl_UNKNOWN_TRANSACTION_OR_RESUMED
 +              .toLocalizedString());
 +    }
 +    if (getTXState() != null) {
 +      throw new IllegalStateException(
 +          LocalizedStrings.TXManagerImpl_TRANSACTION_ACTIVE_CANNOT_RESUME
 +              .toLocalizedString());
 +    }
 +    TXStateProxy txProxy = this.suspendedTXs.remove(transactionId);
 +    if (txProxy == null) {
 +      throw new IllegalStateException(
 +          LocalizedStrings.TXManagerImpl_UNKNOWN_TRANSACTION_OR_RESUMED
 +              .toLocalizedString());
 +    }
 +    resume(txProxy);
 +  }
 +
 +  public boolean isSuspended(TransactionId transactionId) {
 +    return this.suspendedTXs.containsKey(transactionId);
 +  }
 +  
 +  public boolean tryResume(TransactionId transactionId) {
 +    if (transactionId == null || getTXState() != null) {
 +      return false;
 +    }
 +    TXStateProxy txProxy = this.suspendedTXs.remove(transactionId);
 +    if (txProxy != null) {
 +      resume(txProxy);
 +      return true;
 +    }
 +    return false;
 +  }
 +
 +  /**
 +   * this map keeps track of all the threads that are waiting in
 +   * {@link #tryResume(TransactionId, long, TimeUnit)} for a particular
 +   * transactionId
 +   */
 +  private ConcurrentMap<TransactionId, Queue<Thread>> waitMap = new ConcurrentHashMap<TransactionId, Queue<Thread>>();
 +  
 +  public boolean tryResume(TransactionId transactionId, long time, TimeUnit unit) {
 +    if (transactionId == null || getTXState() != null || !exists(transactionId)) {
 +      return false;
 +    }
 +    Thread currentThread = Thread.currentThread();
 +    long timeout = unit.toNanos(time);
 +    long startTime = System.nanoTime();
 +    Queue<Thread> threadq = null;
 +
 +    try {
 +      while (true) {
 +        threadq = waitMap.get(transactionId);
 +        if (threadq == null) {
 +          threadq = new ConcurrentLinkedQueue<Thread>();
 +          Queue<Thread> oldq = waitMap.putIfAbsent(transactionId, threadq);
 +          if (oldq != null) {
 +            threadq = oldq;
 +          }
 +        }
 +        threadq.add(currentThread);
 +        // after putting this thread in waitMap, we should check for
 +        // an entry in suspendedTXs. if no entry is found in suspendedTXs
 +        // next invocation of suspend() will unblock this thread
 +        if (tryResume(transactionId)) {
 +          return true;
 +        } else if (!exists(transactionId)) {
 +          return false;
 +        }
 +        LockSupport.parkNanos(timeout);
 +        long nowTime = System.nanoTime();
 +        timeout -= nowTime - startTime;
 +        startTime = nowTime;
 +        if (timeout <= 0) {
 +          break;
 +        }
 +      }
 +    } finally {
 +      threadq = waitMap.get(transactionId);
 +      if (threadq != null) {
 +        threadq.remove(currentThread);
 +        // the queue itself will be removed at commit/rollback
 +      }
 +    }
 +    return false;
 +  }
 +
 +  public boolean exists(TransactionId transactionId) {
 +    return isHostedTxInProgress((TXId) transactionId)
 +        || isSuspended(transactionId)
 +        || this.localTxMap.containsKey(transactionId);
 +  }
 +
 +  /**
 +   * The timeout after which any suspended transactions are
 +   * rolled back if they are not resumed. If a negative
 +   * timeout is passed, suspended transactions will never expire.
 +   * @param timeout the timeout in minutes
 +   */
 +  public void setSuspendedTransactionTimeout(long timeout) {
 +    this.suspendedTXTimeout = timeout;
 +  }
 +
 +  /**
 +   * Return the timeout after which suspended transactions
 +   * are rolled back.
 +   * @return the timeout in minutes
 +   * @see #setSuspendedTransactionTimeout(long)
 +   */
 +  public long getSuspendedTransactionTimeout() {
 +    return this.suspendedTXTimeout;
 +  }
 +  
 +  /**
 +   * map to track the scheduled expiry tasks of suspended transactions.
 +   */
 +  private ConcurrentMap<TransactionId, SystemTimerTask> expiryTasks = new ConcurrentHashMap<TransactionId, SystemTimerTask>();
 +  
 +  /**
 +   * schedules the transaction to expire after {@link #suspendedTXTimeout}
 +   * @param txId
 +   * @param expiryTimeUnit the time unit to use when scheduling the expiration
 +   */
 +  private void scheduleExpiry(TransactionId txId, TimeUnit expiryTimeUnit) {
 +    final GemFireCacheImpl cache = (GemFireCacheImpl) this.cache;
 +    if (suspendedTXTimeout < 0) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("TX: transaction: {} not scheduled to expire", txId);
 +      }
 +      return;
 +    }
 +    SystemTimerTask task = new TXExpiryTask(txId);
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("TX: scheduling transaction: {} to expire after:{}", txId, suspendedTXTimeout);
 +    }
 +    cache.getCCPTimer().schedule(task, TimeUnit.MILLISECONDS.convert(suspendedTXTimeout, expiryTimeUnit));
 +    this.expiryTasks.put(txId, task);
 +  }
 +
 +  /**
 +   * Task scheduled to expire a transaction when it is suspended.
 +   * This task gets canceled if the transaction is resumed.
 +   * @author sbawaska
 +   */
 +  public static class TXExpiryTask extends SystemTimerTask {
 +
 +    /**
 +     * The txId to expire
 +     */
 +    private final TransactionId txId;
 +    
 +    public TXExpiryTask(TransactionId txId) {
 +      this.txId = txId;
 +    }
 +    @Override
 +    public void run2() {
 +      TXManagerImpl mgr = TXManagerImpl.currentInstance;
 +      TXStateProxy tx = mgr.suspendedTXs.remove(txId);
 +      if (tx != null) {
 +        try {
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("TX: Expiry task rolling back transaction: {}", txId);
 +          }
 +          tx.rollback();
 +        } catch (GemFireException e) {
 +          logger.warn(LocalizedMessage.create(LocalizedStrings.TXManagerImpl_EXCEPTION_IN_TRANSACTION_TIMEOUT, txId), e);
 +        }
 +      }
 +    }
 +  }
 +  private static class RefCountMapEntryCreator implements CustomEntryConcurrentHashMap.HashEntryCreator<AbstractRegionEntry, RefCountMapEntry> {
 +    @Override
 +    public HashEntry<AbstractRegionEntry, RefCountMapEntry> newEntry(AbstractRegionEntry key, int hash,
 +        HashEntry<AbstractRegionEntry, RefCountMapEntry> next, RefCountMapEntry value) {
 +      value.setNextEntry(next);
 +      return value;
 +    }
 +
 +    @Override
 +    public int keyHashCode(Object key, boolean compareValues) {
 +      // key will always be an AbstractRegionEntry because our map is strongly typed.
 +      return ((AbstractRegionEntry) key).getEntryHash();
 +    }
 +  }
 +  private static class RefCountMapEntry implements HashEntry<AbstractRegionEntry, RefCountMapEntry> {
 +    private final AbstractRegionEntry key;
 +    private HashEntry<AbstractRegionEntry, RefCountMapEntry> next;
 +    private volatile int refCount;
 +    private static final AtomicIntegerFieldUpdater<RefCountMapEntry> refCountUpdater
 +      = AtomicIntegerFieldUpdater.newUpdater(RefCountMapEntry.class, "refCount");    
 +    public RefCountMapEntry(AbstractRegionEntry k) {
 +      this.key = k;
 +      this.refCount = 1;
 +    }
 +
 +    @Override
 +    public AbstractRegionEntry getKey() {
 +      return this.key;
 +    }
 +
 +    @Override
 +    public boolean isKeyEqual(Object k) {
 +      return this.key.equals(k);
 +    }
 +
 +    @Override
 +    public RefCountMapEntry getMapValue() {
 +      return this;
 +    }
 +
 +    @Override
 +    public void setMapValue(RefCountMapEntry newValue) {
 +      if (newValue != this) {
 +        throw new IllegalStateException("Expected newValue " + newValue + " to be this " + this);
 +      }
 +    }
 +
 +    @Override
 +    public int getEntryHash() {
 +      return this.key.getEntryHash();
 +    }
 +
 +    @Override
 +    public HashEntry<AbstractRegionEntry, RefCountMapEntry> getNextEntry() {
 +      return this.next;
 +    }
 +
 +    @Override
 +    public void setNextEntry(HashEntry<AbstractRegionEntry, RefCountMapEntry> n) {
 +      this.next = n;
 +    }
 +
 +    public void incRefCount() {
 +      refCountUpdater.addAndGet(this, 1);
 +    }
 +
 +    /**
 +     * Returns true if refCount goes to 0.
 +     */
 +    public boolean decRefCount() {
 +      int rc = refCountUpdater.decrementAndGet(this);
 +      if (rc < 0) {
 +        throw new IllegalStateException("rc=" + rc);
 +      }
 +      return rc == 0;
 +    }
 +  }
 +  
 +  private final CustomEntryConcurrentHashMap<AbstractRegionEntry, RefCountMapEntry> refCountMap
 +    = new CustomEntryConcurrentHashMap<AbstractRegionEntry, RefCountMapEntry>(
 +        CustomEntryConcurrentHashMap.DEFAULT_INITIAL_CAPACITY, 
 +        CustomEntryConcurrentHashMap.DEFAULT_LOAD_FACTOR,
 +        CustomEntryConcurrentHashMap.DEFAULT_CONCURRENCY_LEVEL,
 +        true,
 +        new RefCountMapEntryCreator());
 +  
 +  private static final MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object> incCallback = new MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object>() {
 +    @Override
 +    public RefCountMapEntry newValue(AbstractRegionEntry key, Object context,
 +        Object createParams) {
 +      return new RefCountMapEntry(key);
 +    }
 +    @Override
 +    public void oldValueRead(RefCountMapEntry value) {
 +      value.incRefCount();
 +    }
 +    @Override
 +    public boolean doRemoveValue(RefCountMapEntry value, Object context,
 +        Object removeParams) {
 +      throw new IllegalStateException("doRemoveValue should not be called from create");
 +    }
 +  };
 +  
 +  private static final MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object> decCallback = new MapCallback<AbstractRegionEntry, RefCountMapEntry, Object, Object>() {
 +    @Override
 +    public RefCountMapEntry newValue(AbstractRegionEntry key, Object context,
 +        Object createParams) {
 +      throw new IllegalStateException("newValue should not be called from remove");
 +    }
 +    @Override
 +    public void oldValueRead(RefCountMapEntry value) {
 +      throw new IllegalStateException("oldValueRead should not be called from remove");
 +    }
 +    @Override
 +    public boolean doRemoveValue(RefCountMapEntry value, Object context,
 +        Object removeParams) {
 +      return value.decRefCount();
 +    }
 +  };
 +  
 +  public static final void incRefCount(AbstractRegionEntry re) {
 +    TXManagerImpl mgr = currentInstance;
 +    if (mgr != null) {
 +      mgr.refCountMap.create(re, incCallback, null, null, true);
 +    }
 +  }
 +  /**
 +   * Return true if refCount went to zero.
 +   */
 +  public static final boolean decRefCount(AbstractRegionEntry re) {
 +    TXManagerImpl mgr = currentInstance;
 +    if (mgr != null) {
 +      return mgr.refCountMap.removeConditionally(re, decCallback, null, null) != null;
 +    } else {
 +      return true;
 +    }
 +  }
 +
 +  // Used by tests
 +  public Set<TXId> getLocalTxIds() {
 +    return this.localTxMap.keySet();
 +  }
 +
 +  // Used by tests
 +  public ArrayList<TXId> getHostedTxIds() {
 +    synchronized (this.hostedTXStates) {
 +      return new ArrayList<TXId>(this.hostedTXStates.keySet());
 +    }
 +  }
 +  
 +  public void setDistributed(boolean flag) {
 +    checkClosed();
 +    TXStateProxy tx = getTXState();
 +    // Check whether given flag and current flag are different and whether a transaction is in progress
 +    if (tx != null && flag != isDistributed()) {
 +      // Cannot change mode in the middle of a transaction
 +      throw new java.lang.IllegalStateException(
 +          LocalizedStrings.TXManagerImpl_CANNOT_CHANGE_TRANSACTION_MODE_WHILE_TRANSACTIONS_ARE_IN_PROGRESS
 +              .toLocalizedString());
 +    } else {
 +      isTXDistributed.set(new Boolean(flag));
 +    }
 +  }
 +
 +  /*
 +   * If explicitly set using setDistributed, this returns that value.
 +   * If not, it returns the value of gemfire property "distributed-transactions" if set.
 +   * If this is also not set, it returns the default value of this property.
 +   */
 +  public boolean isDistributed() {
 +    
 +     Boolean value = isTXDistributed.get();
 +    // This can be null if not set in setDistributed().
 +    if (value == null) {
 +      return InternalDistributedSystem.getAnyInstance().getOriginalConfig().getDistributedTransactions();
 +    } else {
 +      return value.booleanValue();
 +    }
 +  }
 +  
 +}