You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by sa...@apache.org on 2017/05/12 20:25:39 UTC

[04/22] incubator-trafodion git commit: This is a large contribution of changes from Esgyn TransactionManager and libraries that are collectively much better tested and hardened than Trafodion, but are too numerous and complex to cherry pick individually

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java
index 31b35d4..dfd06a9 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java
@@ -84,7 +84,7 @@ public class SsccTransactionState extends TransactionState{
     public SsccTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId,
     		final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, long SsccSequenceId) {
 
-       super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging, /* region TX */ false);
+       super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging,SsccSequenceId, /* region TX */ false);
        setStartId(SsccSequenceId);
        if(LOG.isTraceEnabled()) LOG.trace("SsccTransactionState : new state object for transid: " + transactionId + " with sequence: " + SsccSequenceId + " complete");
     }
@@ -93,7 +93,7 @@ public class SsccTransactionState extends TransactionState{
             final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, long SsccSequenceId,
             boolean regionTx) {
 
-        super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging, regionTx);
+        super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging,SsccSequenceId,regionTx);
         setStartId(SsccSequenceId);
         if(LOG.isTraceEnabled()) LOG.trace("SsccTransactionState : new state object for transid: " + transactionId + " with sequence: " + SsccSequenceId + " complete");
     }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
index a7eae1f..e48062a 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
@@ -72,6 +72,9 @@ public class TransactionState {
 
     protected static final Log LOG = LogFactory.getLog(TransactionState.class);
 
+    protected long startId_;
+    protected long commitSequenceId;
+
     /** Current commit progress */
     public enum CommitProgress {
         /** Initial status, still performing operations. */
@@ -88,6 +91,10 @@ public class TransactionState {
          * Checked if we can commit, and said yes. Still need to determine the global decision.
          */
         COMMIT_PENDING,
+        /**
+         * Checked if we can commit, and writeOrdering is empty.
+         */
+        COMMIT_READONLY,
         /** Committed. */
         COMMITED,
         /** Aborted. */
@@ -106,7 +113,10 @@ public class TransactionState {
     protected long controlPointEpochAtPrepare = 0;
     protected int reInstated = 0;
     protected long flushTxId = 0;
+    protected long nodeId;
+    protected long clusterId;
 
+    protected boolean neverReadOnly = false;
     protected boolean splitRetry = false;
     protected boolean earlyLogging = false;
     protected boolean commit_TS_CC = false;
@@ -124,7 +134,8 @@ public class TransactionState {
     public static final byte TS_TRAFODION_TXN_TAG_TYPE = 41;
 
     public TransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, final HRegionInfo regionInfo,
-                                                 HTableDescriptor htd, WAL hLog, boolean logging, boolean isRegionTx) {
+                                                 HTableDescriptor htd, WAL hLog, boolean logging, final long startId,
+                                                 boolean isRegionTx) {
         Tag transactionalTag = null;
         if (LOG.isTraceEnabled()) LOG.trace("Create TS object for " + transactionId + " early logging " + logging);
         this.transactionId = transactionId;
@@ -136,14 +147,18 @@ public class TransactionState {
         this.tabledescriptor = htd;
         this.earlyLogging = logging;
         this.tHLog = hLog;
+        setStartId(startId);
+        setNodeId();
+        setClusterId();
+
         if(isRegionTx){ // RegionTX takes precedence
-           transactionalTag = this.formTransactionalContextTag(TS_REGION_TX_COMMIT_REQUEST);
+           transactionalTag = this.formTransactionalContextTag(TS_REGION_TX_COMMIT_REQUEST, startId);
         }
         else if (earlyLogging) {
-           transactionalTag = this.formTransactionalContextTag(TS_ACTIVE);
+           transactionalTag = this.formTransactionalContextTag(TS_ACTIVE, startId);
         }
         else {
-           transactionalTag = this.formTransactionalContextTag(TS_COMMIT_REQUEST);
+           transactionalTag = this.formTransactionalContextTag(TS_COMMIT_REQUEST, startId);
         }
         tagList.add(transactionalTag);
     }
@@ -172,14 +187,15 @@ public class TransactionState {
        return result;
     }
 
-    public Tag formTransactionalContextTag(int transactionalOp) {
+    public Tag formTransactionalContextTag(int transactionalOp, long ts) {
         byte[] tid = Bytes.toBytes (this.transactionId);
         byte[] logSeqId = Bytes.toBytes(this.hLogStartSequenceId);
         byte[] type = Bytes.toBytes(transactionalOp);
         int vers = 1;
         byte[] version = Bytes.toBytes(vers);
+        byte[] tsId = Bytes.toBytes(ts);
 
-        byte[] tagBytes = concat(version, type, tid, logSeqId);
+        byte[] tagBytes = concat(version, type, tid, logSeqId, tsId);
         byte tagType = TS_TRAFODION_TXN_TAG_TYPE;
         Tag tag = new Tag(tagType, tagBytes);
         return tag;
@@ -197,6 +213,18 @@ public class TransactionState {
             }
         }
     }
+
+   // Same as updateLatestTimestamp except there is no test for isLatestTimestamp()
+   public  static void unconditionalUpdateLatestTimestamp(final Collection<List<Cell>> kvsCollection, final long time) {
+       byte[] timeBytes = Bytes.toBytes(time);
+       // HAVE to manually set the KV timestamps
+       for (List<Cell> kvs : kvsCollection) {
+           for (Cell cell : kvs) {
+             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+             kv.updateLatestStamp(timeBytes);
+           }
+       }
+   }
    /**
     * Returns a boolean indicating whether or not this is a region transaction.
     *
@@ -214,7 +242,7 @@ public class TransactionState {
      */
     public long getNodeId() {
 
-        return ((transactionId >> 32) & 0xFFL);
+        return nodeId;
     }
 
     /**
@@ -226,6 +254,15 @@ public class TransactionState {
 
         return ((transId >> 32) & 0xFFL);
     }
+
+    /**
+     * Set the originating node of the transaction.
+     *
+     */
+    private void setNodeId() {
+       nodeId = ((transactionId >> 32) & 0xFFL);
+    }
+
     /**
      * Get the originating cluster of the passed in transaction.
      *
@@ -233,9 +270,27 @@ public class TransactionState {
      */
     public static long getClusterId(long transId) {
 
-        return ((transId >> 48) & 0xFFL);
+        return (transId >> 48);
     }
 
+    /**
+     * Get the originating cluster of the passed in transaction.
+     *
+     * @return Return the clusterId.
+     */
+    public long getClusterId() {
+
+        return clusterId;
+    }
+
+    /**
+     * Set the originating clusterId of the passed in transaction.
+     *
+     */
+    private void setClusterId() {
+
+        clusterId = (transactionId >> 48);
+    }
 
     /**
      * Get the status.
@@ -250,6 +305,14 @@ public class TransactionState {
       return logSeqId.get();
     }
 
+    public void setNeverReadOnly(boolean value) {
+      neverReadOnly = value;
+    }
+
+    public boolean getNeverReadOnly() {
+      return neverReadOnly;
+    }
+
     public void setSplitRetry(boolean value) {
       splitRetry = value;
     }
@@ -278,6 +341,33 @@ public class TransactionState {
        return xaOperation;
     }
 
+    public void setStartId(long startId)
+    {
+        startId_ = startId;
+    }
+
+    public long getStartId()
+    {
+        return startId_;
+    }
+
+    /**
+     * Get the commitId for this transaction.
+     * 
+     * @return Return the commitSequenceId.
+     */
+    public synchronized long getCommitId() {
+        return commitSequenceId;
+    }
+
+    /**
+     * Set the commitId for this transaction.
+     * 
+     */
+    public synchronized void setCommitId(final long Id) {
+        this.commitSequenceId = Id;
+    }
+
     /**
      * Get the CP epoch at Prepare.
      * 
@@ -381,7 +471,7 @@ public class TransactionState {
     @Override
     public String toString() {
         return "transactionId: " + transactionId + ", regionTX: " + getIsRegionTx()
-                + ", status: " + status + ", regionInfo: " + regionInfo;
+                + ", status: " + status + ", neverReadOnly " + neverReadOnly + ", regionInfo: " + regionInfo;
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java
index c796c2c..b4096f0 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java
@@ -25,7 +25,7 @@ package org.apache.hadoop.hbase.regionserver.transactional;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 
 /**
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
  */
  public class TransactionalRegionScannerHolder {
     public RegionScanner s;
-    public HRegion r;
+    public Region r;
     public long nextCallSeq;
     public long numberOfRows;
     public long rowsRemaining;
@@ -41,10 +41,11 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
     public long scannerId;
     public boolean hasMore;
 
+
     public TransactionalRegionScannerHolder(long transId,
                                             long scannerId,
                                             RegionScanner s, 
-                                            HRegion r) {
+                                            Region r) {
       this.transId = transId;
       this.scannerId = scannerId;
       this.s = s;

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl
index 6ec061d..131d1ee 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl
@@ -76,12 +76,18 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
 #endif
 #ifdef CDH5.7 APACHE1.2
 import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.conf.Configuration;
 #endif
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.io.DataInputBuffer;
+#ifdef CDH5.7 APACHE1.2
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+#endif
 
 /**
  * Holds the state of a transaction. This includes a buffer of all writes, a record of all reads / scans, and
@@ -90,29 +96,35 @@ import org.apache.hadoop.io.DataInputBuffer;
 public class TrxTransactionState extends TransactionState {
 
     static java.lang.reflect.Constructor c1_0 = null;
+#ifdef CDH5.7 APACHE1.2
+    static Configuration config = HBaseConfiguration.create();
+#endif
 
     static {
         String version = VersionInfo.getVersion();// the hbase version string, eg. "0.6.3-dev"
         LOG.info("Got info of Class ScanQueryMatcher for HBase version :" + version);
-        try {
-            NavigableSet<byte[]> lv_nvg = (NavigableSet<byte[]>) null;
-            c1_0 = ScanQueryMatcher.class.getConstructor(new Class[] { Scan.class, 
-                                                                                                                  ScanInfo.class,
-                                                                                                                  java.util.NavigableSet.class,
-                                                                                                                  ScanType.class,
-                                                                                                                  long.class,
-                                                                                                                  long.class,
-                                                                                                                  long.class,
-                                                                                                                  long.class,
-                                                                                                                  RegionCoprocessorHost.class });
-            if (c1_0 != null)
-                LOG.info("Got info of Class ScanQueryMatcher for HBase 1.0");
-
-        } catch (NoSuchMethodException exc_nsm) {
-            LOG.info("TrxRegionEndpoint coprocessor, No matching ScanQueryMatcher : Threw an exception");
-        }
-    }
-
+         try {
+             NavigableSet<byte[]> lv_nvg = (NavigableSet<byte[]>) null;
+             c1_0 = ScanQueryMatcher.class.getConstructor(
+                                   new Class [] {
+                                       Scan.class,
+                                       ScanInfo.class,
+                                       java.util.NavigableSet.class,
+                                       ScanType.class,
+                                       long.class,
+                                       long.class,
+                                       long.class,
+                                       long.class,
+                                       RegionCoprocessorHost.class
+                                   });
+             if (c1_0 != null)
+                 LOG.info("Got info of Class ScanQueryMatcher for HBase 1.0");
+             
+             }
+             catch (NoSuchMethodException exc_nsm) {
+                 LOG.info("TrxRegionEndpoint coprocessor, No matching ScanQueryMatcher : Threw an exception");
+             }
+         }
     /**
      * Simple container of the range of the scanners we've opened. Used to check for conflicting writes.
      */
@@ -155,9 +167,19 @@ public class TrxTransactionState extends TransactionState {
     private Set<TrxTransactionState> transactionsToCheck = Collections.synchronizedSet(new HashSet<TrxTransactionState>());
     private WALEdit e;
     private boolean dropTableRecorded;
+    private HRegion h_Region = null;
+    public long prepareEditSize = 0;
+    public long endEditSize = 0;
 
     public TrxTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId,
-            final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging) {
+            final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, 
+#ifdef CDH5.7 APACHE1.2
+									  boolean logging, HRegion hRegion,
+#else									  
+									  boolean logging,
+#endif									  
+			       final long startId) {
+
        super(transactionId,
              rLogStartSequenceId,
              hlogSeqId,
@@ -165,16 +187,29 @@ public class TrxTransactionState extends TransactionState {
              htd,
              hLog,
              logging,
+             startId,
              false);  // not a region transaction
         this.e = new WALEdit();
         dropTableRecorded = false;
+#ifdef CDH5.7 APACHE1.2	
+	this.h_Region = hRegion;
+#endif	
     }
 
     public TrxTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId,
-            final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, boolean regionTx) {
-        super(transactionId, rLogStartSequenceId, hlogSeqId, regionInfo, htd, hLog, logging, regionTx);
+            final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog,
+#ifdef CDH5.7 APACHE1.2
+									  boolean logging, HRegion hRegion,
+#else									  
+									  boolean logging,
+#endif									  			       
+			       final long startId, boolean regionTx) {
+        super(transactionId, rLogStartSequenceId, hlogSeqId, regionInfo, htd, hLog, logging, startId, regionTx);
         this.e = new WALEdit();
         dropTableRecorded = false;
+#ifdef CDH5.7 APACHE1.2	
+	this.h_Region = hRegion;
+#endif	
     }
 
     public void setDropTableRecorded(boolean dropTableRecord) {
@@ -195,53 +230,97 @@ public class TrxTransactionState extends TransactionState {
         scans.add(new ScanRange(rowKey, rowKey));
     }
 
-    public synchronized void addWrite(final Put write) {
-        if (LOG.isTraceEnabled())
-            LOG.trace("addWrite -- ENTRY: write: " + write.toString());
+    public synchronized void addWrite(final Put put, final boolean useStartId) {
+        if (!this.getStatus().equals(Status.PENDING)) { // Active
+            LOG.error("addWrite late checkin for transaction " + this);
+        }
+
+#ifdef CDH5.7 APACHE1.2	    
+        MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
+        long mvccNum = 0;
+#endif
+        if (LOG.isTraceEnabled()) LOG.trace("addWrite -- ENTRY: useStartId: " + useStartId + " put: " + put.toString());
         WriteAction waction;
         KeyValue kv;
         WALEdit e1 = new WALEdit();
-        updateLatestTimestamp(write.getFamilyCellMap().values(), EnvironmentEdgeManager.currentTime());
+        // If useStartId is true we use the startId as the timestamp,
+        // otherwise we use the current timestamp
+        long now;
+        if (useStartId == true) {
+           now = getStartId();
+           unconditionalUpdateLatestTimestamp(put.getFamilyCellMap().values(), now);
+        }
+        else {
+           now = EnvironmentEdgeManager.currentTime();
+           updateLatestTimestamp(put.getFamilyCellMap().values(), now);
+        }
         // Adding read scan on a write action
-        addRead(new WriteAction(write).getRow());
+        addRead(new WriteAction(put).getRow());
 
         ListIterator<WriteAction> writeOrderIter = writeOrdering.listIterator(writeOrdering.size());
-        writeOrderIter.add(waction = new WriteAction(write));
+        writeOrderIter.add(waction = new WriteAction(put));
 
         if (this.earlyLogging) { // immediately write edit out to HLOG during DML (active transaction state)
-            for (Cell value : waction.getCells()) {
-                // KeyValue kv = KeyValueUtil.ensureKeyValue(value);
-                kv = KeyValue.cloneAndAddTags(value, tagList);
-                // if (LOG.isTraceEnabled()) LOG.trace("KV hex dump " + Hex.encodeHexString(kv.getValueArray()
-                // /*kv.getBuffer()*/));
-                e1.add(kv);
-                e.add(kv);
+           for (Cell value : waction.getCells()) {
+             //KeyValue kv = KeyValueUtil.ensureKeyValue(value);
+             kv = KeyValue.cloneAndAddTags(value, tagList);
+             if (LOG.isTraceEnabled()) LOG.trace("addWrite kv hex dump " + Hex.encodeHexString(kv.getValueArray() /*kv.getBuffer()*/));
+             e1.add(kv);
+             e.add(kv);
             }
-            try {
-                // long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
-                // e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
-                // this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
-                final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), this.regionInfo.getTable(),
-                        EnvironmentEdgeManager.currentTime());
+           try {
+           //long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
+           //         e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
+           //         this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
+
 #ifdef CDH5.7 APACHE1.2
-                long txid = this.tHLog.append(this.tabledescriptor, this.regionInfo, wk, e1, false);
+		WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), 
+					     this.regionInfo.getTable(), 
+					     WALKey.NO_SEQUENCE_ID,
+					     EnvironmentEdgeManager.currentTime(),
+					     WALKey.EMPTY_UUIDS,
+					     HConstants.NO_NONCE,
+					     HConstants.NO_NONCE,
+					     this.h_Region.getMVCC());		
+		long txid = this.tHLog.append(this.tabledescriptor,this.regionInfo, wk, e1, false);
+		
+		writeEntry = wk.getWriteEntry();
+		mvccNum = writeEntry.getWriteNumber();
+		
+		if (writeEntry != null) {
+		    this.h_Region.getMVCC().completeAndWait(writeEntry);
+		    writeEntry = null;
+		}
+
 #else
-                long txid = this.tHLog.append(this.tabledescriptor, this.regionInfo, wk, e1, this.logSeqId, false, null);
+           	final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), this.regionInfo.getTable(), now);
+		long txid = this.tHLog.append(this.tabledescriptor,this.regionInfo, wk , e1,	this.logSeqId, false, null);
 #endif
-                // if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid
-                // " + txid + " ts flush id " + this.flushTxId);
-                if (txid > this.flushTxId)
-                    this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase
-                                           // 1
-            } catch (IOException exp1) {
-                LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception");
-                // throw exp1;
-            }
-        } else { // edits are buffered in ts and written out to HLOG in phase 1
-            for (Cell value : waction.getCells()) {
-                kv = KeyValue.cloneAndAddTags(value, tagList);
-                e.add(kv);
-            }
+           //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid " + txid + " ts flush id " + this.flushTxId);
+           if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1
+           }
+           catch (IOException exp1) {
+           LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception");
+           //throw exp1;
+           }
+#ifdef CDH5.7 APACHE1.2    
+	    finally {
+	       if (writeEntry != null) {
+                   this.h_Region.getMVCC().completeAndWait(writeEntry);
+                   writeEntry = null;
+	       }	  
+	    } // finally
+#endif           
+        }
+        else { // edits are buffered in ts and written out to HLOG in phase 1
+               for (Cell value : waction.getCells()) {
+                    kv = KeyValue.cloneAndAddTags(value, tagList);
+                    if (LOG.isTraceEnabled()) LOG.trace("addWrite kv hex dump "
+                          + " key: " + Hex.encodeHexString(Bytes.toBytes(kv.getKeyString())) 
+                          + " timestamp: " + kv.getTimestamp() 
+                          + Hex.encodeHexString(kv.getValueArray() /*kv.getBuffer()*/));
+                    e.add(kv);
+                }
         }
         if (LOG.isTraceEnabled())
             LOG.trace("addWrite -- EXIT");
@@ -255,16 +334,32 @@ public class TrxTransactionState extends TransactionState {
         return writeOrdering.size();
     }
 
-    public synchronized void addDelete(final Delete delete) {
-        if (LOG.isTraceEnabled())
-            LOG.trace("addDelete -- ENTRY: delete: " + delete.toString());
+    public synchronized void addDelete(final Delete delete, final boolean useStartId) {
+       if (!this.getStatus().equals(Status.PENDING)) { // Active
+          LOG.error("addDelete late checkin for transaction " + this);
+       }
+#ifdef CDH5.7 APACHE1.2	    
+        MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
+        long mvccNum = 0;
+#endif
+        if (LOG.isTraceEnabled()) LOG.trace("addDelete -- ENTRY: useStartId: " + useStartId + " delete: " + delete.toString());
 
         WriteAction waction;
-        WALEdit e1 = new WALEdit();
-        long now = EnvironmentEdgeManager.currentTime();
-        updateLatestTimestamp(delete.getFamilyCellMap().values(), now);
-        if (delete.getTimeStamp() == HConstants.LATEST_TIMESTAMP) {
-            delete.setTimestamp(now);
+	KeyValue kv;
+        WALEdit e1  = new WALEdit();
+        // If useStartId is true we use the startId as the timestamp,
+        // otherwise we use the current timestamp
+        long now;
+        if (useStartId == true) {
+           now = getStartId();
+           unconditionalUpdateLatestTimestamp(delete.getFamilyCellMap().values(), now);
+        }
+        else {
+           now = EnvironmentEdgeManager.currentTime();
+           updateLatestTimestamp(delete.getFamilyCellMap().values(), now);
+           if (delete.getTimeStamp() == HConstants.LATEST_TIMESTAMP) {
+              delete.setTimestamp(now);
+           }
         }
         deletes.add(delete);
 
@@ -272,40 +367,63 @@ public class TrxTransactionState extends TransactionState {
         writeOrderIter.add(waction = new WriteAction(delete));
 
         if (this.earlyLogging) {
-            for (Cell value : waction.getCells()) {
-                KeyValue kv = KeyValue.cloneAndAddTags(value, tagList);
-                e1.add(kv);
-                e.add(kv);
-            }
-            try {
-                // long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
-                // e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
-                // this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
-                final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), this.regionInfo.getTable(),
-                        EnvironmentEdgeManager.currentTime());
+           for (Cell value : waction.getCells()) {
+               kv = KeyValue.cloneAndAddTags(value, tagList);
+               e1.add(kv);
+               e.add(kv);
+           }
+           try {
+           //long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
+           //         e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
+           //         this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
 
 #ifdef CDH5.7 APACHE1.2
-                long txid = this.tHLog.append(this.tabledescriptor, this.regionInfo, wk, e1, false);
+		WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), 
+					     this.regionInfo.getTable(), 
+					     WALKey.NO_SEQUENCE_ID,
+					     EnvironmentEdgeManager.currentTime(),
+					     WALKey.EMPTY_UUIDS,
+					     HConstants.NO_NONCE,
+					     HConstants.NO_NONCE,
+					     this.h_Region.getMVCC());		
+		long txid = this.tHLog.append(this.tabledescriptor,this.regionInfo, wk, e1, false);
+		
+		writeEntry = wk.getWriteEntry();
+		mvccNum = writeEntry.getWriteNumber();
+		
+		if (writeEntry != null) {
+		    this.h_Region.getMVCC().completeAndWait(writeEntry);
+		    writeEntry = null;
+		}
+
 #else
-                long txid = this.tHLog.append(this.tabledescriptor, this.regionInfo, wk, e1, this.logSeqId, false, null);
+           	final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), this.regionInfo.getTable(), now);
+		long txid = this.tHLog.append(this.tabledescriptor,this.regionInfo, wk , e1,	this.logSeqId, false, null);
 #endif
-                // if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y00 write edit to HLOG during delete with
-                // txid " + txid + " ts flush id " + this.flushTxId);
-                if (txid > this.flushTxId)
-                    this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase
-                                           // 1
-            } catch (IOException exp1) {
-                LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception");
-            }
-        } else {
-            for (Cell value : waction.getCells()) {
-                KeyValue kv = KeyValue.cloneAndAddTags(value, tagList);
-                e.add(kv);
-            }
-        }
 
-        if (LOG.isTraceEnabled())
-            LOG.trace("addDelete -- EXIT");
+                    //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y00 write edit to HLOG during delete with txid " + txid + " ts flush id " + this.flushTxId);
+           if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1
+           }
+           catch (IOException exp1) {
+           LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception");
+           }
+#ifdef CDH5.7 APACHE1.2    
+	    finally {
+	       if (writeEntry != null) {
+                   this.h_Region.getMVCC().completeAndWait(writeEntry);
+                   writeEntry = null;
+	       }	  
+	    } // finally
+#endif           
+       }
+       else {
+               for (Cell value : waction.getCells()) {
+                    kv = KeyValue.cloneAndAddTags(value, tagList);
+                    e.add(kv);
+                } // all cells
+        } // not early logging
+    
+       if (LOG.isTraceEnabled()) LOG.trace("addDelete -- EXIT");
     }
 
     public synchronized void applyDeletes(final List<Cell> input, final long minTime, final long maxTime) {
@@ -512,12 +630,20 @@ public class TrxTransactionState extends TransactionState {
         result.append(getIsRegionTx());
         result.append(" status: ");
         result.append(status.name());
+        result.append(" neverReadOnly: ");
+        result.append(getNeverReadOnly());
         result.append(" scan Size: ");
         result.append(scans.size());
         result.append(" write Size: ");
         result.append(getWriteOrdering().size());
         result.append(" startSQ: ");
         result.append(startSequenceNumber);
+        result.append(" prepareEditSize: ");
+        result.append(prepareEditSize);
+        result.append(" endEditSize: ");
+        result.append(endEditSize);
+        result.append(" editSize: ");
+        result.append(getEdit().getCells().size());
         if (sequenceNumber != null) {
             result.append(" commitedSQ:");
             result.append(sequenceNumber);
@@ -541,9 +667,9 @@ public class TrxTransactionState extends TransactionState {
      * 
      * @return deletes
      */
-    public synchronized List<Delete> getDeletes() {
-        return deletes;
-    }
+    //public synchronized List<Delete> getDeletes() {
+    //    return deletes;
+    //}
 
     /**
      * Get a scanner to go through the puts and deletes from this transaction. Used to weave together the local trx puts
@@ -601,7 +727,8 @@ public class TrxTransactionState extends TransactionState {
 
     @SuppressWarnings("deprecation")
     private synchronized KeyValue[] getAllKVs(final Scan scan) {
-        // if (LOG.isTraceEnabled()) LOG.trace("getAllKVs -- ENTRY");
+        if (LOG.isTraceEnabled()) LOG.trace("getAllKVs -- ENTRY for transId " + this.getTransactionId()
+            + " with writeOrdering size " + this.getWriteOrdering().size());
         List<Cell> kvList = new ArrayList<Cell>();
 
         ListIterator<WriteAction> writeOrderIter = null;
@@ -694,7 +821,7 @@ public class TrxTransactionState extends TransactionState {
             
             //Store.ScanInfo scaninfo = new Store.ScanInfo(null, 0, 1, HConstants.FOREVER, false, 0, Cell.COMPARATOR);
 #ifdef CDH5.7 APACHE1.2
-            ScanInfo scaninfo = new ScanInfo(HBaseConfiguration.create(), null, 0, 1, HConstants.FOREVER,KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
+            ScanInfo scaninfo = new ScanInfo(getConfig(), null, 0, 1, HConstants.FOREVER,KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
 #else
             ScanInfo scaninfo = new ScanInfo(null, 0, 1, HConstants.FOREVER,KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
 #endif
@@ -1003,12 +1130,11 @@ public class TrxTransactionState extends TransactionState {
     }
 
     /**
-     * before adding a put to the writeOrdering list, check whether we have deleted a row with the same key. If true
-     * remove the delete before adding the put
+     * before adding a put to the writeOrdering list, check whether we have deleted a row with the
+     * same key.  If true remove the delete before adding the put
      */
     public void removeDelBeforePut(Put put) {
-        if (LOG.isTraceEnabled())
-            LOG.trace("removeDelBeforePut put : " + put);
+        if (LOG.isTraceEnabled()) LOG.trace("removeDelBeforePut put : " + put);
         byte[] putRow = put.getRow();
         KeyValue kv;
 
@@ -1032,4 +1158,9 @@ public class TrxTransactionState extends TransactionState {
             }
         }
     }
+#ifdef CDH5.7 APACHE1.2
+    public Configuration getConfig() {
+      return config;
+    }
+#endif
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
index a0a1385..a69e6b8 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
@@ -36,6 +36,7 @@ message AbortTransactionRequest{
   required int64 transactionId = 2;
   required int32 participantNum = 3;
   required bool dropTableRecorded = 4;
+  optional bool ignoreUnknownTransactionException = 5;
 }
 
 message AbortTransactionResponse {
@@ -56,7 +57,8 @@ message AbortTransactionMultipleResponse {
 
 message BeginTransactionRequest{
   required int64 transactionId = 1;
-  required bytes regionName = 2;
+  required int64 startId = 2;
+  required bytes regionName = 3;
 }
 
 message BeginTransactionResponse {
@@ -67,8 +69,9 @@ message BeginTransactionResponse {
 message CommitRequest{
   required bytes regionName = 1;
   required int64 transactionId = 2;
-  required int32 participantNum = 3;
-  optional bool ignoreUnknownTransactionException = 4;
+  required int64 commitId = 3;
+  required int32 participantNum = 4;
+  optional bool ignoreUnknownTransactionException = 5;
 }
 
 message CommitResponse {
@@ -79,8 +82,9 @@ message CommitResponse {
 message CommitMultipleRequest{
   repeated bytes regionName = 1;
   required int64 transactionId = 2;
-  required int32 participantNum = 3;
-  optional bool ignoreUnknownTransactionException = 4;
+  required int64 commitId = 3;
+  required int32 participantNum = 4;
+  optional bool ignoreUnknownTransactionException = 5;
 }
 
 message CommitMultipleResponse {
@@ -130,12 +134,13 @@ message CommitIfPossibleResponse {
 
 message CheckAndDeleteRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  required bytes row = 3;
-  required bytes family = 4;
-  required bytes qualifier = 5;
-  required bytes value = 6;
-  required MutationProto delete = 7;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  required bytes row = 4;
+  required bytes family = 5;
+  required bytes qualifier = 6;
+  required bytes value = 7;
+  required MutationProto delete = 8;
 }
 
 message CheckAndDeleteResponse {
@@ -146,13 +151,14 @@ message CheckAndDeleteResponse {
 
 message CheckAndDeleteRegionTxRequest {
   required int64 tid = 1;
-  required bytes regionName = 2;
-  required bytes row = 3;
-  required bytes family = 4;
-  required bytes qualifier = 5;
-  required bytes value = 6;
-  required MutationProto delete = 7;
-  required bool autoCommit = 8;
+  required int64 commitId = 2;
+  required bytes regionName = 3;
+  required bytes row = 4;
+  required bytes family = 5;
+  required bytes qualifier = 6;
+  required bytes value = 7;
+  required MutationProto delete = 8;
+  required bool autoCommit = 9;
 }
 
 message CheckAndDeleteRegionTxResponse {
@@ -163,12 +169,13 @@ message CheckAndDeleteRegionTxResponse {
 
 message CheckAndPutRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  required bytes row = 3;
-  required bytes family = 4;
-  required bytes qualifier = 5;
-  required bytes value = 6;
-  required MutationProto put = 7;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  required bytes row = 4;
+  required bytes family = 5;
+  required bytes qualifier = 6;
+  required bytes value = 7;
+  required MutationProto put = 8;
 }
 
 message CheckAndPutResponse {
@@ -179,13 +186,14 @@ message CheckAndPutResponse {
 
 message CheckAndPutRegionTxRequest {
   required int64 tid = 1;
-  required bytes regionName = 2;
-  required bytes row = 3;
-  required bytes family = 4;
-  required bytes qualifier = 5;
-  required bytes value = 6;
-  required MutationProto put = 7;
-  required bool autoCommit = 8;
+  required int64 commitId = 2;
+  required bytes regionName = 3;
+  required bytes row = 4;
+  required bytes family = 5;
+  required bytes qualifier = 6;
+  required bytes value = 7;
+  required MutationProto put = 8;
+  required bool autoCommit = 9;
 }
 
 message CheckAndPutRegionTxResponse {
@@ -207,8 +215,9 @@ message CloseScannerResponse {
 
 message DeleteMultipleTransactionalRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  repeated MutationProto delete = 3;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  repeated MutationProto delete = 4;
 }
 
 message DeleteMultipleTransactionalResponse {
@@ -219,9 +228,10 @@ message DeleteMultipleTransactionalResponse {
 
 message DeleteRegionTxRequest {
   required int64 tid = 1;
-  required bytes regionName = 2;
-  required MutationProto delete = 3;
-  required bool autoCommit = 4;
+  required int64 commitId = 2;
+  required bytes regionName = 3;
+  required MutationProto delete = 4;
+  required bool autoCommit = 5;
 }
 
 message DeleteRegionTxResponse {
@@ -232,8 +242,9 @@ message DeleteRegionTxResponse {
 
 message DeleteTransactionalRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  required MutationProto delete = 3;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  required MutationProto delete = 4;
 }
 
 message DeleteTransactionalResponse {
@@ -244,8 +255,9 @@ message DeleteTransactionalResponse {
 
 message GetTransactionalRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  required Get get = 3;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  required Get get = 4;
 }
 
 message GetTransactionalResponse {
@@ -256,8 +268,9 @@ message GetTransactionalResponse {
 
 message OpenScannerRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  required Scan scan = 3;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  required Scan scan = 4;
 }
 
 message OpenScannerResponse {
@@ -268,11 +281,12 @@ message OpenScannerResponse {
 
 message PerformScanRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  required int64 scannerId = 3;
-  required int32 numberOfRows = 4;
-  required bool  closeScanner = 5;
-  required int64 nextCallSeq = 6;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  required int64 scannerId = 4;
+  required int32 numberOfRows = 5;
+  required bool  closeScanner = 6;
+  required int64 nextCallSeq = 7;
 }
 
 message PerformScanResponse {
@@ -286,9 +300,10 @@ message PerformScanResponse {
 
 message PutRegionTxRequest {
   required int64 tid = 1;
-  required bytes regionName = 2;
-  required MutationProto put = 3;
-  required bool  autoCommit = 4;
+  required int64 commitId = 2;
+  required bytes regionName = 3;
+  required MutationProto put = 4;
+  required bool  autoCommit = 5;
 }
 
 message PutRegionTxResponse {
@@ -299,8 +314,9 @@ message PutRegionTxResponse {
 
 message PutTransactionalRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  required MutationProto put = 3;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  required MutationProto put = 4;
 }
 
 message PutTransactionalResponse {
@@ -311,8 +327,9 @@ message PutTransactionalResponse {
 
 message PutMultipleTransactionalRequest {
   required int64 transactionId = 1;
-  required bytes regionName = 2;
-  repeated MutationProto put = 3;
+  required int64 startId = 2;
+  required bytes regionName = 3;
+  repeated MutationProto put = 4;
 }
 
 message PutMultipleTransactionalResponse {
@@ -346,55 +363,45 @@ message RecoveryRequestResponse {
 
 message TlogDeleteRequest{
   required bytes regionName = 1;
-  required int64 transactionId = 2;
-  required Scan scan = 3;
-  required int64 auditSeqNum = 4;
-  required bool ageCommitted = 5;
+  required Scan scan = 2;
+  required int64 auditSeqNum = 3;
+  required bool ageCommitted = 4;
 }
 
 message TlogDeleteResponse {
   repeated Result result = 1;
   required int64  count = 2;
-  optional string exception = 3;
-  optional bool hasException = 4;
+  optional bool hasException = 3;
+  optional string exception = 4;
 }
 
 message TlogWriteRequest{
   required bytes regionName = 1;
   required int64 transactionId = 2;
   required MutationProto put = 3;
-  required bytes row = 4;
-  required bytes family = 5;
-  required bytes qualifier = 6;
-  required bytes value = 7;
-  required int64 commitId = 8;
-  optional bool forced = 9;
+  required bytes family = 4;
+  required bytes qualifier = 5;
+  required int64 commitId = 6;
+  optional bool forced = 7;
 }
 
 message TlogWriteResponse {
   repeated int64 result = 1;
-  optional string exception = 2;
-  optional bool hasException = 3;
+  optional bool  hasException = 2;
+  optional string exception = 3;
 }
 
 message TlogTransactionStatesFromIntervalRequest{
-  required bytes regionName = 1;
-  required int64 transactionId = 2;
-  required int64 clusterId = 3;
-  required int64 auditSeqNum =4;
-  required int64 scannerId = 5;
-  required int32 numberOfRows = 6;
-  required int64 nextCallSeq = 7;
-  required bool  closeScanner = 8;
+  required int64 clusterId = 1;
+  required int64 auditSeqNum =2;
+  required Scan scan = 3;
 }
 
 message TlogTransactionStatesFromIntervalResponse {
   repeated Result result = 1;
   required int64  count = 2;
-  required int64  nextCallSeq = 3;
-  required bool   hasMore = 4;
-  optional string exception = 5;
-  optional bool hasException = 6;
+  optional string exception = 3;
+  optional bool hasException = 4;
 }
 
 message TransactionalAggregateRequest {
@@ -406,9 +413,11 @@ message TransactionalAggregateRequest {
    */
   required bytes regionName = 1;
   required int64 transactionId = 2;
-  required string interpreter_class_name = 3;
-  required Scan scan = 4;
-  optional bytes  interpreter_specific_bytes = 5;
+  required int64 startId = 3;
+  required string interpreter_class_name = 4;
+  required Scan scan = 5;
+  optional bytes  interpreter_specific_bytes = 6;
+  
 }
 
 message TransactionalAggregateResponse {

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
index ba81304..a595d42 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
@@ -83,10 +83,14 @@ public class HBaseAuditControlPoint {
     private Configuration config;
     private static String CONTROL_POINT_TABLE_NAME;
     private static final byte[] CONTROL_POINT_FAMILY = Bytes.toBytes("cpf");
-    private static final byte[] ASN_HIGH_WATER_MARK = Bytes.toBytes("hwm");
-    private static HTable table;
+    private static final byte[] CP_NUM_AND_ASN_HWM = Bytes.toBytes("hwm");
+    private HTable table;
     private boolean useAutoFlush;
     private boolean disableBlockCache;
+    private static final int versions = 10;
+    private static int myClusterId = 0;
+    private int TlogRetryDelay;
+    private int TlogRetryCount;
 
     public HBaseAuditControlPoint(Configuration config, Connection connection) throws IOException {
       if (LOG.isTraceEnabled()) LOG.trace("Enter HBaseAuditControlPoint constructor()");
@@ -96,17 +100,46 @@ public class HBaseAuditControlPoint {
       HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(CONTROL_POINT_TABLE_NAME));
       HColumnDescriptor hcol = new HColumnDescriptor(CONTROL_POINT_FAMILY);
 
+      TlogRetryDelay = 3000; // 3 seconds
+      try {
+         String retryDelayS = System.getenv("TM_TLOG_RETRY_DELAY");
+         if (retryDelayS != null){
+            TlogRetryDelay = (Integer.parseInt(retryDelayS) > TlogRetryDelay ? Integer.parseInt(retryDelayS) : TlogRetryDelay);
+         }
+      }
+      catch (NumberFormatException e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_RETRY_DELAY is not valid in ms.env");
+      }
+
+      TlogRetryCount = 40;
+      try {
+         String retryCountS = System.getenv("TM_TLOG_RETRY_COUNT");
+         if (retryCountS != null){
+           TlogRetryCount = (Integer.parseInt(retryCountS) > TlogRetryCount ? Integer.parseInt(retryCountS) : TlogRetryCount);
+         }
+      }
+      catch (NumberFormatException e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_RETRY_COUNT is not valid in ms.env");
+      }
+
       disableBlockCache = false;
-      String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
-      if (blockCacheString != null){
-         disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
+      try {
+         String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
+         if (blockCacheString != null){
+             disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
          if (LOG.isDebugEnabled()) LOG.debug("disableBlockCache != null");
+         }
+      }
+      catch (NumberFormatException e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_DISABLE_BLOCK_CACHE is not valid in ms.env");
       }
       LOG.info("disableBlockCache is " + disableBlockCache);
       if (disableBlockCache) {
          hcol.setBlockCacheEnabled(false);
       }
 
+      hcol.setMaxVersions(versions);
+
       desc.addFamily(hcol);
 
       useAutoFlush = true;
@@ -120,7 +153,6 @@ public class HBaseAuditControlPoint {
       boolean lvControlPointExists = admin.tableExists(TableName.valueOf(CONTROL_POINT_TABLE_NAME));
       if (LOG.isDebugEnabled()) LOG.debug("HBaseAuditControlPoint lvControlPointExists " + lvControlPointExists);
       currControlPt = -1;
-      admin.close();
       if (lvControlPointExists == false) {
          try {
             if (LOG.isDebugEnabled()) LOG.debug("Creating the table " + CONTROL_POINT_TABLE_NAME);
@@ -128,7 +160,11 @@ public class HBaseAuditControlPoint {
             currControlPt = 1;
          }
          catch (TableExistsException e) {
-            LOG.error("Table " + CONTROL_POINT_TABLE_NAME + " already exists");
+            LOG.error("Table " + CONTROL_POINT_TABLE_NAME + " already exists", e);
+            throw new IOException(e);
+         }
+         finally{
+            admin.close();
          }
       }
       if (LOG.isDebugEnabled()) LOG.debug("try new HTable");
@@ -136,7 +172,8 @@ public class HBaseAuditControlPoint {
       table.setAutoFlushTo(this.useAutoFlush);
 
       if (currControlPt == -1){
-         currControlPt = getCurrControlPt();
+         if (LOG.isDebugEnabled()) LOG.debug("getting currControlPt for clusterId " + myClusterId);
+         currControlPt = getCurrControlPt(myClusterId);
       }
       if (LOG.isDebugEnabled()) LOG.debug("currControlPt is " + currControlPt);
 
@@ -144,167 +181,198 @@ public class HBaseAuditControlPoint {
       return;
     }
 
-   public long getCurrControlPt() throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("getCurrControlPt:  start");
-      long highKey = -1;
-      if (LOG.isDebugEnabled()) LOG.debug("new Scan");
-      Scan s = new Scan();
-      s.setCaching(10);
-      s.setCacheBlocks(false);
-      if (LOG.isDebugEnabled()) LOG.debug("resultScanner");
-      ResultScanner ss = table.getScanner(s);
-      try {
-         long currKey;
-         String rowKey;
-         if (LOG.isDebugEnabled()) LOG.debug("entering for loop" );
-         for (Result r : ss) {
-            rowKey = new String(r.getRow());
-            if (LOG.isDebugEnabled()) LOG.debug("rowKey is " + rowKey );
-            currKey = Long.parseLong(rowKey);
-            if (LOG.isDebugEnabled()) LOG.debug("value is " + Long.parseLong(Bytes.toString(r.value())));
-            if (currKey > highKey) {
-               if (LOG.isDebugEnabled()) LOG.debug("Setting highKey to " + currKey);
-               highKey = currKey;
+    public long getCurrControlPt(final int clusterId) throws IOException {
+       if (LOG.isTraceEnabled()) LOG.trace("getCurrControlPt:  start, clusterId " + clusterId);
+       long lvCpNum = 1;
+
+       Get g = new Get(Bytes.toBytes(clusterId));
+       if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt attempting table.get");
+          Result r = table.get(g);
+          if (r.isEmpty())
+             return lvCpNum;
+          if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt Result: " + r);
+          String value = new String(Bytes.toString(r.getValue(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM)));
+          // In theory the above value is the latestversion of the column
+          if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt for clusterId: " + clusterId + ", valueString is " + value);
+          StringTokenizer stok = new StringTokenizer(value, ",");
+          if (stok.hasMoreElements()) {
+             if (LOG.isTraceEnabled()) LOG.trace("Parsing record in getCurrControlPt");
+             String ctrlPtToken = stok.nextElement().toString();
+             lvCpNum = Long.parseLong(ctrlPtToken, 10);
+             if (LOG.isTraceEnabled()) LOG.trace("Value for getCurrControlPt and clusterId: "
+                               + clusterId + " is: " + lvCpNum);
+          }
+       return lvCpNum;
+    }
+
+   public long putRecord(final int clusterId, final long ControlPt, final long startingSequenceNumber) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("putRecord clusterId: " + clusterId + ", startingSequenceNumber (" + startingSequenceNumber + ")");
+      Put p = new Put(Bytes.toBytes(clusterId));
+      p.add(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM, 
+    		  Bytes.toBytes(String.valueOf(ControlPt) + ","
+    	               + String.valueOf(startingSequenceNumber)));
+      boolean complete = false;
+      int retries = 0;
+      do {
+         try {
+       	    retries++;
+            if (LOG.isTraceEnabled()) LOG.trace("try table.put with cluster Id: " + clusterId + " and startingSequenceNumber " + startingSequenceNumber);
+            table.put(p);
+            if (useAutoFlush == false) {
+               if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record");
+               table.flushCommits();
+            }
+            complete = true;
+            if (retries > 1){
+               if (LOG.isTraceEnabled()) LOG.trace("Retry successful in putRecord for cp: " + ControlPt + " on table "
+                        + table.getTableName().toString());                    	 
             }
          }
-      } finally {
-         ss.close();
-      }
-      if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt returning " + highKey);
-      return highKey;
-   }
-
-   public long putRecord(final long ControlPt, final long startingSequenceNumber) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("putRecord starting sequence number ("  + String.valueOf(startingSequenceNumber) + ")");
-      String controlPtString = new String(String.valueOf(ControlPt));
-      Put p = new Put(Bytes.toBytes(controlPtString));
-      p.add(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK, Bytes.toBytes(String.valueOf(startingSequenceNumber)));
-      if (LOG.isTraceEnabled()) LOG.trace("try table.put with starting sequence number " + startingSequenceNumber);
-      table.put(p);
-      if (useAutoFlush == false) {
-         if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record");
-         table.flushCommits();
-      }
+         catch (IOException e){
+             LOG.error("Retrying putRecord on control point: " + ControlPt + " on control point table "
+                     + table.getTableName().toString() + " due to Exception " + e);
+//             locator.getRegionLocation(p.getRow(), true);
+             table.getRegionLocation(p.getRow(), true);
+             try {
+               Thread.sleep(TlogRetryDelay); // 3 second default
+             } catch (InterruptedException ie) {
+             }
+             if (retries == TlogRetryCount){
+                LOG.error("putRecord aborting due to excessive retries on on control point table : "
+                         + table.getTableName().toString() + " due to Exception; aborting ");
+                System.exit(1);
+             }
+         }
+      } while (! complete && retries < TlogRetryCount);  // default give up after 5 minutes
       if (LOG.isTraceEnabled()) LOG.trace("HBaseAuditControlPoint:putRecord returning " + ControlPt);
       return ControlPt;
    }
 
-   public ArrayList<String> getRecordList(String controlPt) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("getRecord");
-      ArrayList<String> transactionList = new ArrayList<String>();
-      Get g = new Get(Bytes.toBytes(controlPt));
-      Result r = table.get(g);
-      byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
-      String recordString = new String(currValue);
-      if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
-      StringTokenizer st = new StringTokenizer(recordString, ",");
-      while (st.hasMoreElements()) {
-        String token = st.nextElement().toString() ;
-        if (LOG.isDebugEnabled()) LOG.debug("token is " + token);
-        transactionList.add(token);
-      }
-
-      if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit with list size (" + transactionList.size() + ")");
-      return transactionList;
-
-    }
+   public long getRecord(final int clusterId, final String controlPt) throws IOException {
 
-   public long getRecord(final String controlPt) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("getRecord " + controlPt);
+      if (LOG.isTraceEnabled()) LOG.trace("getRecord clusterId: " + clusterId + " controlPt: " + controlPt);
       long lvValue = -1;
-      Get g = new Get(Bytes.toBytes(controlPt));
-      String recordString;
-      Result r = table.get(g);
-      byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
-      if (currValue != null)
-      {
-          recordString = new String (Bytes.toString(currValue));
-          if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
-          lvValue = Long.parseLong(recordString, 10);
-      }
-      if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit " + lvValue);
+      Get g = new Get(Bytes.toBytes(clusterId));
+      g.setMaxVersions(versions);  // will return last n versions of row
+      g.addColumn(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);
+      String ctrlPtToken;
+      String asnToken;
+         Result r = table.get(g);
+         if (r.isEmpty())
+            return lvValue;
+         List<Cell> list = r.getColumnCells(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);  // returns all versions of this column
+         for (Cell element : list) {
+            StringTokenizer stok = new StringTokenizer(Bytes.toString(CellUtil.cloneValue(element)), ",");
+            if (stok.hasMoreElements()) {
+               ctrlPtToken = stok.nextElement().toString();
+               if (LOG.isTraceEnabled()) LOG.trace("Parsing record for controlPt (" + ctrlPtToken + ")");
+               asnToken = stok.nextElement().toString();
+               if (Long.parseLong(ctrlPtToken, 10) == Long.parseLong(controlPt, 10)){
+                  // This is the one we are looking for
+                  lvValue = Long.parseLong(asnToken, 10);
+                  if (LOG.isTraceEnabled()) LOG.trace("ASN value for controlPt: " + controlPt + " is: " + lvValue);
+                  return lvValue;
+               }
+            }
+            else {
+               if (LOG.isTraceEnabled()) LOG.trace("No tokens to parse for controlPt (" + controlPt + ")");
+            }
+         }
+         if (LOG.isTraceEnabled()) LOG.trace("all results scannned for clusterId: " + clusterId + ", but controlPt: " + controlPt + " not found");
       return lvValue;
+   }
 
-    }
-
-   public long getStartingAuditSeqNum() throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum");
-      String controlPtString = new String(String.valueOf(currControlPt));
-      long lvAsn;
-      if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum new get for control point " + currControlPt);
-      Get g = new Get(Bytes.toBytes(controlPtString));
-      if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum setting result");
-      Result r = table.get(g);
-      if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum currValue CONTROL_POINT_FAMILY is "
-                 + CONTROL_POINT_FAMILY + " ASN_HIGH_WATER_MARK " + ASN_HIGH_WATER_MARK);
-      byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
-      if (LOG.isDebugEnabled()) LOG.debug("Starting asn setting recordString ");
-      if (currValue == null)
-         lvAsn = 1;
-      else
-      {
-         String recordString = new String(currValue);
-         lvAsn = Long.valueOf(recordString);
-      }
+   public long getStartingAuditSeqNum(final int clusterId) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum for clusterId: " + clusterId);
+      long lvAsn = 1;
+
+      Get g = new Get(Bytes.toBytes(clusterId));
+      String asnToken;
+      if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum attempting table.get");
+         Result r = table.get(g);
+         if (r.isEmpty())
+            return lvAsn;
+         if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum Result: " + r);
+         String value = new String(Bytes.toString(r.getValue(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM)));
+         // In theory the above value is the latestversion of the column
+         if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum for clusterId: " + clusterId + ", valueString is " + value);
+         StringTokenizer stok = new StringTokenizer(value, ",");
+         if (stok.hasMoreElements()) {
+            if (LOG.isTraceEnabled()) LOG.trace("Parsing record in getStartingAuditSeqNum");
+            stok.nextElement();  // skip the control point token
+            asnToken = stok.nextElement().toString();
+            lvAsn = Long.parseLong(asnToken, 10);
+            if (LOG.isTraceEnabled()) LOG.trace("Value for getStartingAuditSeqNum and clusterId: "
+                + clusterId + " is: " + lvAsn);
+         }
       if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum - exit returning " + lvAsn);
       return lvAsn;
     }
 
-   public long getNextAuditSeqNum(int nid) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum for node: " + nid);
+   public long getNextAuditSeqNum(int clusterId, int nid) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum for cluster " + clusterId + " node: " + nid);
 
       // We need to open the appropriate control point table and read the value from it
       Table remoteTable;
       String lv_tName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(nid) + "_CONTROL_POINT");
-      Connection remoteConnection = ConnectionFactory.createConnection(this.config);
-      remoteTable = remoteConnection.getTable(TableName.valueOf(lv_tName));
+      remoteTable = connection.getTable(TableName.valueOf(lv_tName));
+
+      long lvAsn = 1;
 
-      long highValue = -1;
       try {
-         Scan s = new Scan();
-         s.setCaching(10);
-         s.setCacheBlocks(false);
-         if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum resultScanner");
-         ResultScanner ss = remoteTable.getScanner(s);
-         try {
-            long currValue;
-            String rowKey;
-            if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum entering for loop" );
-            for (Result r : ss) {
-               rowKey = new String(r.getRow());
-               if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum rowKey is " + rowKey );
-               currValue =  Long.parseLong(Bytes.toString(r.value()));
-               if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum value is " + currValue);
-               if (currValue > highValue) {
-                  if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum Setting highValue to " + currValue);
-                  highValue = currValue;
-               }
+         Get g = new Get(Bytes.toBytes(clusterId));
+         if (LOG.isDebugEnabled()) LOG.debug("getNextAuditSeqNum attempting remoteTable.get");
+         Result r = remoteTable.get(g);
+         if (!r.isEmpty()){
+            if (LOG.isDebugEnabled()) LOG.debug("getNextAuditSeqNum Result: " + r);
+            String value = new String(Bytes.toString(r.getValue(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM)));
+            // In theory the above value is the latest version of the column
+            if (LOG.isDebugEnabled()) LOG.debug("getNextAuditSeqNum for clusterId: " + clusterId + ", valueString is " + value);
+            StringTokenizer stok = new StringTokenizer(value, ",");
+            if (stok.hasMoreElements()) {
+               if (LOG.isTraceEnabled()) LOG.trace("Parsing record in getNextAuditSeqNum");
+               stok.nextElement();  // skip the control point token
+               String asnToken = stok.nextElement().toString();
+               lvAsn = Long.parseLong(asnToken, 10);
+               if (LOG.isTraceEnabled()) LOG.trace("Value for getNextAuditSeqNum and clusterId: "
+                            + clusterId + " is: " + (lvAsn + 1));
             }
-         } finally {
-            ss.close();
          }
       } finally {
-         try {
-            remoteTable.close();
-            remoteConnection.close();
-         }
-         catch (IOException e) {
-            LOG.error("getNextAuditSeqNum IOException closing table or connection for " + lv_tName);
-            e.printStackTrace();
-         }
+         remoteTable.close();
       }
-      if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum returning " + (highValue + 1));
-      return (highValue + 1);
-    }
+      if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum returning " + (lvAsn + 1));
+      return (lvAsn + 1);
+   }
+
 
+   public long doControlPoint(final int clusterId, final long sequenceNumber, final boolean incrementCP) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("doControlPoint start");
 
-   public long doControlPoint(final long sequenceNumber) throws IOException {
-      currControlPt++;
-      if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), sequenceNumber (" + sequenceNumber+ ") try putRecord");
-      putRecord(currControlPt, sequenceNumber);
+         if (incrementCP) {
+           currControlPt++;
+         }
+         if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), clusterId: " + clusterId + ", sequenceNumber (" + sequenceNumber+ ") try putRecord");
+         putRecord(clusterId, currControlPt, sequenceNumber);
+      if (LOG.isTraceEnabled()) LOG.trace("doControlPoint - exit");
       return currControlPt;
    }
 
+   public long bumpControlPoint(final int clusterId, final int count) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("bumpControlPoint start, count: " + count);
+      long currASN = -1;
+         currControlPt = getCurrControlPt(clusterId);
+         currASN = getStartingAuditSeqNum(clusterId);
+         for ( int i = 0; i < count; i++ ) {
+            currControlPt++;
+            if (LOG.isTraceEnabled()) LOG.trace("bumpControlPoint putting new record " + (i + 1) + " for control point ("
+                 + currControlPt + "), clusterId: " + clusterId + ", ASN (" + currASN + ")");
+            putRecord(clusterId, currControlPt, currASN);
+         }
+      if (LOG.isTraceEnabled()) LOG.trace("bumpControlPoint - exit");
+      return currASN;
+   }
+
    public boolean deleteRecord(final long controlPoint) throws IOException {
       if (LOG.isTraceEnabled()) LOG.trace("deleteRecord start for control point " + controlPoint);
       String controlPtString = new String(String.valueOf(controlPoint));
@@ -317,33 +385,80 @@ public class HBaseAuditControlPoint {
       return true;
    }
 
-   public boolean deleteAgedRecords(final long controlPoint) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords start - control point " + controlPoint);
+   public boolean deleteAgedRecords(final int clusterId, final long controlPoint) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords start - clusterId " + clusterId + " control point " + controlPoint);
       String controlPtString = new String(String.valueOf(controlPoint));
 
-      Scan s = new Scan();
-      s.setCaching(10);
-      s.setCacheBlocks(false);
       ArrayList<Delete> deleteList = new ArrayList<Delete>();
-      ResultScanner ss = table.getScanner(s);
-      try {
-         String rowKey;
-         for (Result r : ss) {
-            rowKey = new String(r.getRow());
-            if (Long.parseLong(rowKey) < controlPoint) {
-               if (LOG.isDebugEnabled()) LOG.debug("Adding  (" + rowKey + ") to delete list");
-               Delete del = new Delete(rowKey.getBytes());
-               deleteList.add(del);
+      Get g = new Get(Bytes.toBytes(clusterId));
+      g.setMaxVersions(versions);  // will return last n versions of row
+      g.addColumn(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);
+      String ctrlPtToken;
+         Result r = table.get(g);
+         if (r.isEmpty())
+            return false;
+         List<Cell> list = r.getColumnCells(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);  // returns all versions of this column
+         for (Cell cell : list) {
+            StringTokenizer stok = 
+                    new StringTokenizer(Bytes.toString(CellUtil.cloneValue(cell)), ",");
+            if (stok.hasMoreElements()) {
+               ctrlPtToken = stok.nextElement().toString();
+               if (LOG.isTraceEnabled()) LOG.trace("Parsing record for controlPoint (" + ctrlPtToken + ")");
+               if (Long.parseLong(ctrlPtToken, 10) <= controlPoint){
+                  // This is one we are looking for
+                  Delete del = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getTimestamp());
+                  deleteList.add(del);
+                  if (LOG.isTraceEnabled()) LOG.trace("Deleting entry for ctrlPtToken: " + ctrlPtToken);
+               }
+            }
+            else {
+               if (LOG.isTraceEnabled()) LOG.trace("No tokens to parse for controlPoint (" + controlPoint + ")");
             }
          }
          if (LOG.isDebugEnabled()) LOG.debug("attempting to delete list with " + deleteList.size() + " elements");
          table.delete(deleteList);
-      } finally {
-         ss.close();
-      }
-
       if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords - exit");
       return true;
    }
+   
+   public String getTableName(){
+      return CONTROL_POINT_TABLE_NAME;
+   }
+   
+   public long getNthRecord(int clusterId, int n) throws IOException{
+      if (LOG.isTraceEnabled()) LOG.trace("getNthRecord start - clusterId " + clusterId + " n: " + n);
+
+      Get g = new Get(Bytes.toBytes(clusterId));
+      g.setMaxVersions(n + 1);  // will return last n+1 versions of row just in case
+      g.addColumn(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);
+      String ctrlPtToken;
+      long lvReturn = 1;
+         Result r = table.get(g);
+         if (r.isEmpty())
+            return lvReturn; 
+         List<Cell> list = r.getColumnCells(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);  // returns all versions of this column
+         int i = 0;
+         for (Cell cell : list) {
+            i++;
+            StringTokenizer stok = 
+                    new StringTokenizer(Bytes.toString(CellUtil.cloneValue(cell)), ",");
+            if (stok.hasMoreElements()) {
+               ctrlPtToken = stok.nextElement().toString();
+               if (LOG.isTraceEnabled()) LOG.trace("Parsing record for controlPoint (" + ctrlPtToken + ")");
+               if ( i < n ){
+                  if (LOG.isTraceEnabled()) LOG.trace("Skipping record " + i + " of " + n + " for controlPoint" );
+                  continue;
+               }
+               lvReturn = Long.parseLong(ctrlPtToken);;
+               if (LOG.isTraceEnabled()) LOG.trace("getNthRecord exit - returning " + lvReturn);
+               return lvReturn;
+            }
+            else {
+               if (LOG.isTraceEnabled()) LOG.trace("No tokens to parse for " + i);
+            }
+         }
+      if (LOG.isTraceEnabled()) LOG.trace("getNthRecord - exit returning 1");
+      return 1;
+   }
 }