You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2015/10/14 03:14:46 UTC

[03/11] incubator-trafodion git commit: TRAFODION-1521 Build Trafodion without having HBase installed

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/TmAuditTlog.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/TmAuditTlog.java b/core/sqf/src/seatrans/tm/hbasetmlib2/TmAuditTlog.java
deleted file mode 100644
index 22ad0e3..0000000
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/TmAuditTlog.java
+++ /dev/null
@@ -1,1201 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.dtm;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.Logger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.transactional.TransactionManager;
-import org.apache.hadoop.hbase.client.transactional.TransactionState;
-import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
-import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
-import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
-import org.apache.hadoop.hbase.client.transactional.TransactionRegionLocation;
-import org.apache.hadoop.hbase.client.transactional.TransState;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.LocalHBaseCluster;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.ConcurrentModificationException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-
-public class TmAuditTlog {
-
-   static final Log LOG = LogFactory.getLog(TmAuditTlog.class);
-   private static HBaseAdmin admin;
-   private Configuration config;
-   private static String TLOG_TABLE_NAME;
-   private static final byte[] TLOG_FAMILY = Bytes.toBytes("tf");
-   private static final byte[] ASN_STATE = Bytes.toBytes("as");
-   private static final byte[] QUAL_TX_STATE = Bytes.toBytes("tx");
-   private static HTable[] table;
-   private static HBaseAuditControlPoint tLogControlPoint;
-   private static long tLogControlPointNum;
-   private static long tLogHashKey;
-   private static int  tLogHashShiftFactor;
-   private int dtmid;
-
-   // For performance metrics
-   private static long[] startTimes;
-   private static long[] endTimes;
-   private static long[] synchTimes;
-   private static long[] bufferSizes;
-   private static AtomicInteger  timeIndex;
-   private static long   totalWriteTime;
-   private static long   totalSynchTime;
-   private static long   totalPrepTime;
-   private static AtomicLong  totalWrites;
-   private static AtomicLong  totalRecords;
-   private static long   minWriteTime;
-   private static long   minWriteTimeBuffSize;
-   private static long   maxWriteTime; 
-   private static long   maxWriteTimeBuffSize;
-   private static double avgWriteTime;
-   private static long   minPrepTime;
-   private static long   maxPrepTime;
-   private static double avgPrepTime;
-   private static long   minSynchTime;
-   private static long   maxSynchTime;
-   private static double avgSynchTime;
-   private static long   minBufferSize;
-   private static long   maxBufferSize;
-   private static double avgBufferSize;
-
-   private static int     versions;
-   private static int     tlogNumLogs;
-   private boolean useAutoFlush;
-   private static boolean ageCommitted;
-   private static boolean forceControlPoint;
-   private boolean disableBlockCache;
-   private boolean controlPointDeferred;
- 
-   private static AtomicLong asn;  // Audit sequence number is the monotonic increasing value of the tLog write
-
-   private static Object tlogAuditLock[];        // Lock for synchronizing access via regions.
-
-   private static Object tablePutLock;            // Lock for synchronizing table.put operations
-                                                  // to avoid ArrayIndexOutOfBoundsException
-   private static byte filler[];
-
-   public static final int TM_TX_STATE_NOTX = 0; //S0 - NOTX
-   public static final int TM_TX_STATE_ACTIVE = 1; //S1 - ACTIVE
-   public static final int TM_TX_STATE_FORGOTTEN = 2; //N/A
-   public static final int TM_TX_STATE_COMMITTED = 3; //N/A
-   public static final int TM_TX_STATE_ABORTING = 4; //S4 - ROLLBACK
-   public static final int TM_TX_STATE_ABORTED = 5; //S4 - ROLLBACK
-   public static final int TM_TX_STATE_COMMITTING = 6; //S3 - PREPARED
-   public static final int TM_TX_STATE_PREPARING = 7; //S2 - IDLE
-   public static final int TM_TX_STATE_FORGETTING = 8; //N/A
-   public static final int TM_TX_STATE_PREPARED = 9; //S3 - PREPARED XARM Branches only!
-   public static final int TM_TX_STATE_FORGETTING_HEUR = 10; //S5 - HEURISTIC
-   public static final int TM_TX_STATE_BEGINNING = 11; //S1 - ACTIVE
-   public static final int TM_TX_STATE_HUNGCOMMITTED = 12; //N/A
-   public static final int TM_TX_STATE_HUNGABORTED = 13; //S4 - ROLLBACK
-   public static final int TM_TX_STATE_IDLE = 14; //S2 - IDLE XARM Branches only!
-   public static final int TM_TX_STATE_FORGOTTEN_HEUR = 15; //S5 - HEURISTIC - Waiting Superior TM xa_forget request
-   public static final int TM_TX_STATE_ABORTING_PART2 = 16; // Internal State
-   public static final int TM_TX_STATE_TERMINATING = 17;
-   public static final int TM_TX_STATE_LAST = 17;
-
-   private class AuditBuffer{
-      private ArrayList<Put> buffer;           // Each Put is an audit record
-
-      private AuditBuffer () {
-         buffer = new  ArrayList<Put>();
-         buffer.clear();
-
-      }
-
-      private void bufferAdd(Put localPut) throws Exception {
-         long threadId = Thread.currentThread().getId();
-         if (LOG.isTraceEnabled()) LOG.trace("BufferAdd start in thread " + threadId );
-         try {
-            buffer.add(localPut);
-         }
-         catch (Exception e) {
-            if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying bufferAdd" + e);
-            throw e;
-         }
-         if (LOG.isTraceEnabled()) LOG.trace("BufferAdd end in thread " + threadId );
-      }
-
-      private int bufferSize() throws Exception {
-         int lvSize;
-         long threadId = Thread.currentThread().getId();
-         if (LOG.isTraceEnabled()) LOG.trace("BufferSize start in thread " + threadId );
-         try {
-            lvSize = buffer.size();
-         }
-         catch (Exception e) {
-            if (LOG.isDebugEnabled()) LOG.debug("AuditBuffer Exception trying bufferSize" + e);
-            throw e;
-         }
-         if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferSize end; returning " + lvSize + " in thread " 
-                    +  Thread.currentThread().getId());
-         return lvSize;
-      }
-
-      private void bufferClear() throws Exception {
-         long threadId = Thread.currentThread().getId();
-         if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferClear start in thread " + threadId);
-         try {
-            buffer.clear();
-         }
-         catch (Exception e) {
-            if (LOG.isDebugEnabled()) LOG.debug("Exception trying bufferClear.clear" + e);
-            throw e;
-         }
-         if (LOG.isTraceEnabled()) LOG.trace("AuditBuffer bufferClear end in thread " + threadId);
-      }
-
-      private ArrayList<Put> getBuffer() throws Exception {
-         long threadId = Thread.currentThread().getId();
-         if (LOG.isTraceEnabled()) LOG.trace("getBuffer start in thread " + threadId );
-         return this.buffer;
-      }
-   }// End of class AuditBuffer
-
-   public class TmAuditTlogRegionSplitPolicy extends RegionSplitPolicy {
-
-      @Override
-      protected boolean shouldSplit(){
-         return false;
-      }
-   }
-
-   public TmAuditTlog (Configuration config) throws IOException, RuntimeException {
-
-      this.config = config;
-      this.dtmid = Integer.parseInt(config.get("dtmid"));
-      if (LOG.isTraceEnabled()) LOG.trace("Enter TmAuditTlog constructor for dtmid " + dtmid);
-      TLOG_TABLE_NAME = config.get("TLOG_TABLE_NAME");
-      int fillerSize = 2;
-      controlPointDeferred = false;
-
-      forceControlPoint = false;
-      try {
-         String controlPointFlush = System.getenv("TM_TLOG_FLUSH_CONTROL_POINT");
-         if (controlPointFlush != null){
-            forceControlPoint = (Integer.parseInt(controlPointFlush) != 0);
-            if (LOG.isTraceEnabled()) LOG.trace("controlPointFlush != null");
-         }
-      }
-      catch (Exception e) {
-         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_FLUSH_CONTROL_POINT is not in ms.env");
-      }
-      LOG.info("forceControlPoint is " + forceControlPoint);
-
-      useAutoFlush = true;
-      try {
-         String autoFlush = System.getenv("TM_TLOG_AUTO_FLUSH");
-         if (autoFlush != null){
-            useAutoFlush = (Integer.parseInt(autoFlush) != 0);
-            if (LOG.isTraceEnabled()) LOG.trace("autoFlush != null");
-         }
-      }
-      catch (Exception e) {
-         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_AUTO_FLUSH is not in ms.env");
-      }
-      LOG.info("useAutoFlush is " + useAutoFlush);
-
-      ageCommitted = false;
-      try {
-         String ageCommittedRecords = System.getenv("TM_TLOG_AGE_COMMITTED_RECORDS");
-         if (ageCommittedRecords != null){
-            ageCommitted = (Integer.parseInt(ageCommittedRecords) != 0);
-            if (LOG.isTraceEnabled()) LOG.trace("ageCommittedRecords != null");
-         }
-      }
-      catch (Exception e) {
-         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_AGE_COMMITTED_RECORDS is not in ms.env");
-      }
-      LOG.info("ageCommitted is " + ageCommitted);
-
-      versions = 5;
-      try {
-         String maxVersions = System.getenv("TM_TLOG_MAX_VERSIONS");
-         if (maxVersions != null){
-            versions = (Integer.parseInt(maxVersions) > versions ? Integer.parseInt(maxVersions) : versions);
-         }
-      }
-      catch (Exception e) {
-         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_MAX_VERSIONS is not in ms.env");
-      }
-
-      tlogNumLogs = 1;
-      try {
-         String numLogs = System.getenv("TM_TLOG_NUM_LOGS");
-         if (numLogs != null) {
-            tlogNumLogs = Math.max( 1, Integer.parseInt(numLogs));
-         }
-      }
-      catch (Exception e) {
-         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_NUM_LOGS is not in ms.env");
-      }
-      disableBlockCache = false;
-      try {
-         String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
-         if (blockCacheString != null){
-            disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
-            if (LOG.isTraceEnabled()) LOG.trace("disableBlockCache != null");
-         }
-      }
-      catch (Exception e) {
-         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_DISABLE_BLOCK_CACHE is not in ms.env");
-      }
-      LOG.info("disableBlockCache is " + disableBlockCache);
-
-      switch (tlogNumLogs) {
-        case 1:
-          tLogHashKey = 0b0;
-          tLogHashShiftFactor = 63;
-          break;
-        case 2:
-          tLogHashKey = 0b1;
-          tLogHashShiftFactor = 63;
-          break;
-        case 4:
-          tLogHashKey = 0b11;
-          tLogHashShiftFactor = 62;
-          break;
-        case 8:
-          tLogHashKey = 0b111;
-          tLogHashShiftFactor = 61;
-          break;
-        case 16:
-          tLogHashKey = 0b1111;
-          tLogHashShiftFactor = 60;
-          break;
-        case 32:
-          tLogHashKey = 0b11111;
-          tLogHashShiftFactor = 59;
-          break;
-        default : {
-          LOG.error("TM_TLOG_NUM_LOGS must b 1 or a power of 2 in the range 2-32");
-          throw new RuntimeException();
-        }
-      }
-      if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_NUM_LOGS is " + tlogNumLogs);
-
-      HColumnDescriptor hcol = new HColumnDescriptor(TLOG_FAMILY);
-      if (disableBlockCache) {
-         hcol.setBlockCacheEnabled(false);
-      }
-      hcol.setMaxVersions(versions);
-      admin = new HBaseAdmin(config);
-
-      filler = new byte[fillerSize];
-      Arrays.fill(filler, (byte) ' ');
-      startTimes      =    new long[50];
-      endTimes        =    new long[50];
-      synchTimes      =    new long[50];
-      bufferSizes     =    new long[50];
-      totalWriteTime  =    0;
-      totalSynchTime  =    0;
-      totalPrepTime   =    0;
-      totalWrites     =    new AtomicLong(0);
-      totalRecords    =    new AtomicLong(0);
-      minWriteTime    =    1000000000;
-      minWriteTimeBuffSize  =    0;
-      maxWriteTime    =    0;
-      maxWriteTimeBuffSize  =    0;
-      avgWriteTime    =    0;
-      minPrepTime     =    1000000000;
-      maxPrepTime     =    0;
-      avgPrepTime     =    0;
-      minSynchTime    =    1000000000;
-      maxSynchTime    =    0;
-      avgSynchTime    =    0;
-      minBufferSize   =    1000;
-      maxBufferSize   =    0;
-      avgBufferSize   =    0;
-      timeIndex       =    new AtomicInteger(1);
-
-      asn = new AtomicLong();  // Monotonically increasing count of write operations
-
-      long lvAsn = 0;
-
-      try {
-         if (LOG.isTraceEnabled()) LOG.trace("try new HBaseAuditControlPoint");
-         tLogControlPoint = new HBaseAuditControlPoint(config);
-      }
-      catch (Exception e) {
-         LOG.error("Unable to create new HBaseAuditControlPoint object " + e);
-      }
-
-      tlogAuditLock =    new Object[tlogNumLogs];
-      table = new HTable[tlogNumLogs];
-
-      try {
-         // Get the asn from the last control point.  This ignores 
-         // any asn increments between the last control point
-         // write and a system crash and could result in asn numbers
-         // being reused.  However this would just mean that some old 
-         // records are held onto a bit longer before cleanup and is safe.
-         asn.set(tLogControlPoint.getStartingAuditSeqNum());
-      }
-      catch (Exception e2){
-         if (LOG.isDebugEnabled()) LOG.debug("Exception setting the ASN " + e2);
-         if (LOG.isDebugEnabled()) LOG.debug("Setting the ASN to 1");
-         asn.set(1L);  // Couldn't read the asn so start asn at 1
-      }
-
-      for (int i = 0 ; i < tlogNumLogs; i++) {
-         tlogAuditLock[i]      = new Object();
-         String lv_tLogName = new String(TLOG_TABLE_NAME + "_LOG_" + Integer.toHexString(i));
-         boolean lvTlogExists = admin.tableExists(lv_tLogName);
-         if (LOG.isTraceEnabled()) LOG.trace("Tlog table " + lv_tLogName + (lvTlogExists? " exists" : " does not exist" ));
-         HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(lv_tLogName));
-         desc.addFamily(hcol);
-
-          if (lvTlogExists == false) {
-            // Need to prime the asn for future writes
-            try {
-               if (LOG.isTraceEnabled()) LOG.trace("Creating the table " + lv_tLogName);
-               admin.createTable(desc);
-               asn.set(1L);  // TLOG didn't exist previously, so start asn at 1
-            }
-            catch (TableExistsException e) {
-               LOG.error("Table " + lv_tLogName + " already exists");
-            }
-         }
-         try {
-            if (LOG.isTraceEnabled()) LOG.trace("try new HTable index " + i);
-            table[i] = new HTable(config, desc.getName());
-         }
-         catch(Exception e){
-            LOG.error("TmAuditTlog Exception on index " + i + "; " + e);
-            throw new RuntimeException(e);
-         }
-
-         table[i].setAutoFlushTo(this.useAutoFlush);
-
-      }
-
-      lvAsn = asn.get();
-      // This control point write needs to be delayed until after recovery completes, 
-      // but is here as a placeholder
-      if (LOG.isTraceEnabled()) LOG.trace("Starting a control point with asn value " + lvAsn);
-      tLogControlPointNum = tLogControlPoint.doControlPoint(lvAsn);
-
-      if (LOG.isTraceEnabled()) LOG.trace("Exit constructor()");
-      return;
-   }
-
-   public long getNextAuditSeqNum(int nid) throws IOException{
-      if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum node: " + nid);
-      return tLogControlPoint.getNextAuditSeqNum(nid);
-   }
-
-   public static long asnGetAndIncrement () {
-      if (LOG.isTraceEnabled()) LOG.trace("asnGetAndIncrement");
-      return asn.getAndIncrement();
-   }
-
-   public void putSingleRecord(final long lvTransid, final long lvCommitId, final String lvTxState, final Set<TransactionRegionLocation> regions, boolean forced) throws Exception {
-      putSingleRecord(lvTransid, lvCommitId, lvTxState, regions, forced, -1);
-   }
-
-   public void putSingleRecord(final long lvTransid, final long lvCommitId, final String lvTxState, final Set<TransactionRegionLocation> regions, boolean forced, long recoveryASN) throws Exception {
-      long threadId = Thread.currentThread().getId();
-      if (LOG.isTraceEnabled()) LOG.trace("putSingleRecord start in thread " + threadId);
-      StringBuilder tableString = new StringBuilder();
-      String transidString = new String(String.valueOf(lvTransid));
-      String commitIdString = new String(String.valueOf(lvCommitId));
-      boolean lvResult = true;
-      long lvAsn;
-      long startSynch = 0;
-      long endSynch = 0;
-      int lv_lockIndex = 0;
-      int lv_TimeIndex = (timeIndex.getAndIncrement() % 50 );
-      long lv_TotalWrites = totalWrites.incrementAndGet();
-      long lv_TotalRecords = totalRecords.incrementAndGet();
-      if (regions != null) {
-         // Regions passed in indicate a state record where recovery might be needed following a crash.
-         // To facilitate branch notification we translate the regions into table names that can then
-         // be translated back into new region names following a restart.  THis allows us to ensure all
-         // branches reply prior to cleanup
-         Iterator<TransactionRegionLocation> it = regions.iterator();
-         List<String> tableNameList = new ArrayList<String>();
-         while (it.hasNext()) {
-            String name = new String(it.next().getRegionInfo().getTable().getNameAsString());
-            if ((name.length() > 0) && (tableNameList.contains(name) != true)){
-               // We have a table name not already in the list
-               tableNameList.add(name);
-               tableString.append(",");
-               tableString.append(name);
-            }
-         }
-         if (LOG.isTraceEnabled()) LOG.trace("table names: " + tableString.toString());
-      }
-      //Create the Put as directed by the hashed key boolean
-      Put p;
-
-      //create our own hashed key
-      long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
-      lv_lockIndex = (int)(lvTransid & tLogHashKey);
-      if (LOG.isTraceEnabled()) LOG.trace("key: " + key + ", hex: " + Long.toHexString(key) + ", transid: " +  lvTransid);
-      p = new Put(Bytes.toBytes(key));
-
-      if (recoveryASN == -1){
-         // This is a normal audit record so we manage the ASN
-         lvAsn = asn.getAndIncrement();
-      }
-      else {
-         // This is a recovery audit record so use the ASN passed in
-         lvAsn = recoveryASN;
-      }
-      if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " + lvTxState + " ASN: " + lvAsn);
-      p.add(TLOG_FAMILY, ASN_STATE, Bytes.toBytes(String.valueOf(lvAsn) + ","
-                       + transidString + "," + lvTxState
-                       + "," + Bytes.toString(filler)
-                       + "," + commitIdString
-                       + "," + tableString.toString()));
-
-
-      if (recoveryASN != -1){
-         // We need to send this to a remote Tlog, not our local one, so open the appropriate table
-         HTableInterface recoveryTable;
-         int lv_ownerNid = (int)(lvTransid >> 32);
-         String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(lv_ownerNid) + "_LOG_" + Integer.toHexString(lv_lockIndex));
-         HConnection recoveryTableConnection = HConnectionManager.createConnection(this.config);
-         recoveryTable = recoveryTableConnection.getTable(TableName.valueOf(lv_tLogName));
-
-         try {
-            recoveryTable.put(p);
-         }
-         catch (Exception e2){
-            // create record of the exception
-            LOG.error("putSingleRecord Exception in recoveryTable" + e2);
-            e2.printStackTrace();
-            throw e2;
-         }
-         finally {
-            try {
-               recoveryTable.close();
-               recoveryTableConnection.close();
-            }
-            catch (IOException e) {
-               LOG.error("putSingleRecord IOException closing recovery table or connection for table " + lv_tLogName);
-               e.printStackTrace();
-            }
-         }
-      }
-      else {
-         // THis goes to our local TLOG
-         if (LOG.isTraceEnabled()) LOG.trace("TLOG putSingleRecord synchronizing tlogAuditLock[" + lv_lockIndex + "] in thread " + threadId );
-         startSynch = System.nanoTime();
-         try {
-            synchronized (tlogAuditLock[lv_lockIndex]) {
-               endSynch = System.nanoTime();
-               try {
-                  if (LOG.isTraceEnabled()) LOG.trace("try table.put " + p );
-                  startTimes[lv_TimeIndex] = System.nanoTime();
-                  table[lv_lockIndex].put(p);
-                  if ((forced) && (useAutoFlush == false)) {
-                     if (LOG.isTraceEnabled()) LOG.trace("flushing commits");
-                     table[lv_lockIndex].flushCommits();
-                  }
-                  endTimes[lv_TimeIndex] = System.nanoTime();
-               }
-               catch (Exception e2){
-                  // create record of the exception
-                  LOG.error("putSingleRecord Exception " + e2);
-                  e2.printStackTrace();
-                  throw e2;
-               }
-            } // End global synchronization
-         }
-         catch (Exception e) {
-            // create record of the exception
-            LOG.error("Synchronizing on tlogAuditLock[" + lv_lockIndex + "] Exception " + e);
-            e.printStackTrace();
-            throw e;
-         }
-         if (LOG.isTraceEnabled()) LOG.trace("TLOG putSingleRecord synchronization complete in thread " + threadId );
-
-         synchTimes[lv_TimeIndex] = endSynch - startSynch;
-         totalSynchTime += synchTimes[lv_TimeIndex];
-         totalWriteTime += (endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]);
-         if (synchTimes[lv_TimeIndex] > maxSynchTime) {
-            maxSynchTime = synchTimes[lv_TimeIndex];
-         }
-         if (synchTimes[lv_TimeIndex] < minSynchTime) {
-            minSynchTime = synchTimes[lv_TimeIndex];
-         }
-         if ((endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]) > maxWriteTime) {
-            maxWriteTime = (endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]);
-         }
-         if ((endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]) < minWriteTime) {
-            minWriteTime = (endTimes[lv_TimeIndex] - startTimes[lv_TimeIndex]);
-         }
-
-         if (lv_TimeIndex == 49) {
-            timeIndex.set(1);  // Start over so we don't exceed the array size
-         }
-
-         if (lv_TotalWrites == 59999) {
-            avgWriteTime = (double) (totalWriteTime/lv_TotalWrites);
-            avgSynchTime = (double) (totalSynchTime/lv_TotalWrites);
-            LOG.info("TLog Audit Write Report\n" + 
-                   "                        Total records: "
-                       + lv_TotalRecords + " in " + lv_TotalWrites + " write operations\n" +
-                   "                        Write time:\n" +
-                   "                                     Min:  " 
-                       + minWriteTime / 1000 + " microseconds\n" +
-                   "                                     Max:  " 
-                       + maxWriteTime / 1000 + " microseconds\n" +
-                   "                                     Avg:  " 
-                       + avgWriteTime / 1000 + " microseconds\n" +
-                   "                        Synch time:\n" +
-                   "                                     Min:  " 
-                       + minSynchTime / 1000 + " microseconds\n" +
-                   "                                     Max:  " 
-                       + maxSynchTime / 1000 + " microseconds\n" +
-                   "                                     Avg:  " 
-                       + avgSynchTime / 1000 + " microseconds\n");
-
-            // Start at index 1 since there is no startTimes[0]
-            timeIndex.set(1);
-            endTimes[0]          = System.nanoTime();
-            totalWriteTime       = 0;
-            totalSynchTime       = 0;
-            totalPrepTime        = 0;
-            totalRecords.set(0);
-            totalWrites.set(0);
-            minWriteTime         = 50000;             // Some arbitrary high value
-            maxWriteTime         = 0;
-            minWriteTimeBuffSize = 0;
-            maxWriteTimeBuffSize = 0;
-            minSynchTime         = 50000;             // Some arbitrary high value
-            maxSynchTime         = 0;
-            minPrepTime          = 50000;            // Some arbitrary high value
-            maxPrepTime          = 0;
-            minBufferSize        = 1000;             // Some arbitrary high value
-            maxBufferSize        = 0;
-         }
-      }// End else revoveryASN == -1
-      if (LOG.isTraceEnabled()) LOG.trace("putSingleRecord exit");
-   }
-
-   public static int getRecord(final long lvTransid) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("getRecord start");
-      TransState lvTxState = TransState.STATE_NOTX;
-      String stateString;
-      int lv_lockIndex = (int)(lvTransid & tLogHashKey);
-      try {
-         String transidString = new String(String.valueOf(lvTransid));
-         Get g;
-         //create our own hashed key
-         long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
-         if (LOG.isTraceEnabled()) LOG.trace("key: " + key + " hex: " + Long.toHexString(key));
-         g = new Get(Bytes.toBytes(key));
-         try {
-            Result r = table[lv_lockIndex].get(g);
-            byte [] value = r.getValue(TLOG_FAMILY, ASN_STATE);
-            stateString =  new String (Bytes.toString(value));
-            if (LOG.isTraceEnabled()) LOG.trace("stateString is " + stateString);
-            if (stateString.compareTo("COMMITTED") == 0){
-               lvTxState = TransState.STATE_COMMITTED;
-            }
-            else if (stateString.compareTo("ABORTED") == 0){
-               lvTxState = TransState.STATE_ABORTED;
-            }
-            else if (stateString.compareTo("ACTIVE") == 0){
-               lvTxState = TransState.STATE_ACTIVE;
-            }
-            else if (stateString.compareTo("PREPARED") == 0){
-               lvTxState = TransState.STATE_PREPARED;
-            }
-            else if (stateString.compareTo("NOTX") == 0){
-               lvTxState = TransState.STATE_NOTX;
-            }
-            else if (stateString.compareTo("FORGOTTEN") == 0){
-               lvTxState = TransState.STATE_FORGOTTEN;
-            }
-            else if (stateString.compareTo("ABORTING") == 0){
-               lvTxState = TransState.STATE_ABORTING;
-            }
-            else if (stateString.compareTo("COMMITTING") == 0){
-               lvTxState = TransState.STATE_COMMITTING;
-            }
-            else if (stateString.compareTo("PREPARING") == 0){
-               lvTxState = TransState.STATE_PREPARING;
-            }
-            else if (stateString.compareTo("FORGETTING") == 0){
-               lvTxState = TransState.STATE_FORGETTING;
-            }
-            else if (stateString.compareTo("FORGETTING_HEUR") == 0){
-               lvTxState = TransState.STATE_FORGETTING_HEUR;
-            }
-            else if (stateString.compareTo("BEGINNING") == 0){
-               lvTxState = TransState.STATE_BEGINNING;
-            }
-            else if (stateString.compareTo("HUNGCOMMITTED") == 0){
-              lvTxState = TransState.STATE_HUNGCOMMITTED;
-            }
-            else if (stateString.compareTo("HUNGABORTED") == 0){
-               lvTxState = TransState.STATE_HUNGABORTED;
-            }
-            else if (stateString.compareTo("IDLE") == 0){
-               lvTxState = TransState.STATE_IDLE;
-            }
-            else if (stateString.compareTo("FORGOTTEN_HEUR") == 0){
-               lvTxState = TransState.STATE_FORGOTTEN_HEUR;
-            }
-            else if (stateString.compareTo("ABORTING_PART2") == 0){
-               lvTxState = TransState.STATE_ABORTING_PART2;
-            }
-            else if (stateString.compareTo("TERMINATING") == 0){
-               lvTxState = TransState.STATE_TERMINATING;
-            }
-            else {
-               lvTxState = TransState.STATE_BAD;
-            }
-
-            if (LOG.isTraceEnabled()) LOG.trace("transid: " + lvTransid + " state: " + lvTxState);
-         }
-         catch (IOException e){
-             LOG.error("getRecord IOException");
-             throw e;
-         }
-         catch (Exception e){
-             LOG.error("getRecord Exception");
-             throw e;
-         }
-      }
-      catch (Exception e2) {
-            LOG.error("getRecord Exception2 " + e2);
-            e2.printStackTrace();
-      }
-
-      if (LOG.isTraceEnabled()) LOG.trace("getRecord end; returning " + lvTxState);
-      return lvTxState.getValue();
-   }
-
-   public static String getRecord(final String transidString) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("getRecord start");
-      long lvTransid = Long.parseLong(transidString, 10);
-      int lv_lockIndex = (int)(lvTransid & tLogHashKey);
-      String lvTxState = new String("NO RECORD");
-      try {
-         Get g;
-         //create our own hashed key
-         long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
-         if (LOG.isTraceEnabled()) LOG.trace("key: " + key + " hex: " + Long.toHexString(key));
-         g = new Get(Bytes.toBytes(key));
-         try {
-            Result r = table[lv_lockIndex].get(g);
-            byte [] value = r.getValue(TLOG_FAMILY, ASN_STATE);
-            StringTokenizer st = new StringTokenizer(value.toString(), ",");
-            String asnToken = st.nextElement().toString();
-            String transidToken = st.nextElement().toString();
-            lvTxState = st.nextElement().toString();
-            if (LOG.isTraceEnabled()) LOG.trace("transid: " + transidToken + " state: " + lvTxState);
-         } catch (IOException e){
-             LOG.error("getRecord IOException");
-             throw e;
-         }
-      } catch (Exception e){
-             LOG.error("getRecord Exception " + e);
-             throw e;
-      }
-      if (LOG.isTraceEnabled()) LOG.trace("getRecord end; returning String:" + lvTxState);
-      return lvTxState;
-   }
-      
-
-   public static boolean deleteRecord(final long lvTransid) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("deleteRecord start " + lvTransid);
-      String transidString = new String(String.valueOf(lvTransid));
-      int lv_lockIndex = (int)(lvTransid & tLogHashKey);
-      try {
-         Delete d;
-         //create our own hashed key
-         long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
-         if (LOG.isTraceEnabled()) LOG.trace("key: " + key + " hex: " + Long.toHexString(key));
-         d = new Delete(Bytes.toBytes(key));
-         if (LOG.isTraceEnabled()) LOG.trace("deleteRecord  (" + lvTransid + ") ");
-         table[lv_lockIndex].delete(d);
-      }
-      catch (Exception e) {
-         LOG.error("deleteRecord Exception " + e );
-      }
-      if (LOG.isTraceEnabled()) LOG.trace("deleteRecord - exit");
-      return true;
-   }
-
-   public boolean deleteAgedEntries(final long lvAsn) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("deleteAgedEntries start:  Entries older than " + lvAsn + " will be removed");
-      HTableInterface deleteTable;
-      for (int i = 0; i < tlogNumLogs; i++) {
-         String lv_tLogName = new String(TLOG_TABLE_NAME + "_LOG_" + Integer.toHexString(i));
-//         Connection deleteConnection = ConnectionFactory.createConnection(this.config);
-
-         HConnection deleteConnection = HConnectionManager.createConnection(this.config);
-
-         deleteTable = deleteConnection.getTable(TableName.valueOf(lv_tLogName));
-         try {
-            Scan s = new Scan();
-            s.setCaching(100);
-            s.setCacheBlocks(false);
-            ArrayList<Delete> deleteList = new ArrayList<Delete>();
-            ResultScanner ss = deleteTable.getScanner(s);
-
-            try {
-               for (Result r : ss) {
-                  for (Cell cell : r.rawCells()) {
-                     String valueString = new String(CellUtil.cloneValue(cell));
-                     StringTokenizer st = new StringTokenizer(valueString, ",");
-                     if (st.hasMoreElements()) {
-                        String asnToken = st.nextElement().toString() ;
-                        String transidToken = st.nextElement().toString() ;
-                        String stateToken = st.nextElement().toString() ;
-                        if ((Long.parseLong(asnToken) < lvAsn) && (stateToken.equals("FORGOTTEN"))) {
-                           String rowKey = new String(r.getRow());
-                           Delete del = new Delete(r.getRow());
-                           if (LOG.isTraceEnabled()) LOG.trace("adding transid: " + transidToken + " to delete list");
-                           deleteList.add(del);
-                        }
-                        else if ((Long.parseLong(asnToken) < lvAsn) &&
-                                (stateToken.equals("COMMITTED") || stateToken.equals("ABORTED"))) {
-                           if (ageCommitted) {
-                              String rowKey = new String(r.getRow());
-                              Delete del = new Delete(r.getRow());
-                              if (LOG.isTraceEnabled()) LOG.trace("adding transid: " + transidToken + " to delete list");
-                              deleteList.add(del);
-                           }
-                           else {
-                              String key = new String(r.getRow());
-                              Get get = new Get(r.getRow());
-                              get.setMaxVersions(versions);  // will return last n versions of row
-                              Result lvResult = deleteTable.get(get);
-                             // byte[] b = lvResult.getValue(TLOG_FAMILY, ASN_STATE);  // returns current version of value
-                              List<Cell> list = lvResult.getColumnCells(TLOG_FAMILY, ASN_STATE);  // returns all versions of this column
-                              for (Cell element : list) {
-                                 String value = new String(CellUtil.cloneValue(element));
-                                 StringTokenizer stok = new StringTokenizer(value, ",");
-                                 if (stok.hasMoreElements()) {
-                                    if (LOG.isTraceEnabled()) LOG.trace("Performing secondary search on (" + transidToken + ")");
-                                    asnToken = stok.nextElement().toString() ;
-                                    transidToken = stok.nextElement().toString() ;
-                                    stateToken = stok.nextElement().toString() ;
-                                    if ((Long.parseLong(asnToken) < lvAsn) && (stateToken.equals("FORGOTTEN"))) {
-                                       Delete del = new Delete(r.getRow());
-                                       if (LOG.isTraceEnabled()) LOG.trace("Secondary search found new delete - adding (" + transidToken + ") with asn: " + asnToken + " to delete list");
-                                       deleteList.add(del);
-                                       break;
-                                    }
-                                    else {
-                                       if (LOG.isTraceEnabled()) LOG.trace("Secondary search skipping entry with asn: " + asnToken + ", state: " 
-                                                + stateToken + ", transid: " + transidToken );
-                                    }
-                                 }
-                              }
-                           }
-                        } else {
-                           if (LOG.isTraceEnabled()) LOG.trace("deleteAgedEntries skipping asn: " + asnToken + ", transid: " 
-                                     + transidToken + ", state: " + stateToken);
-                        }
-                     }
-                  }
-              }
-           }
-           catch(Exception e){
-              LOG.error("deleteAgedEntries Exception getting results for table index " + i + "; " + e);
-              throw new RuntimeException(e);
-           }
-           finally {
-              ss.close();
-           }
-           if (LOG.isTraceEnabled()) LOG.trace("attempting to delete list with " + deleteList.size() + " elements");
-           try {
-              deleteTable.delete(deleteList);
-           }
-           catch(IOException e){
-              LOG.error("deleteAgedEntries Exception deleting from table index " + i + "; " + e);
-              throw new RuntimeException(e);
-           }
-        }
-        catch (IOException e) {
-           LOG.error("deleteAgedEntries IOException setting up scan on table index " + i);
-           e.printStackTrace();
-        }
-        finally {
-           try {
-              deleteTable.close();
-              deleteConnection.close();
-           }
-           catch (IOException e) {
-              LOG.error("deleteAgedEntries IOException closing table or connection for table index " + i);
-              e.printStackTrace();
-           }
-        }
-     }
-     if (LOG.isTraceEnabled()) LOG.trace("deleteAgedEntries - exit");
-     return true;
-   }
-
-   public long writeControlPointRecords (final Map<Long, TransactionState> map) throws IOException, Exception {
-      int lv_lockIndex;
-      int cpWrites = 0;
-      long startTime = System.nanoTime();
-      long endTime;
-
-      if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords start with map size " + map.size());
-
-      try {
-        for (Map.Entry<Long, TransactionState> e : map.entrySet()) {
-         try {
-            Long transid = e.getKey();
-            lv_lockIndex = (int)(transid & tLogHashKey);
-            TransactionState value = e.getValue();
-            if (value.getStatus().equals("COMMITTED")){
-               if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords adding record for trans (" + transid + ") : state is " + value.getStatus());
-               cpWrites++;
-               if (forceControlPoint) {
-                  putSingleRecord(transid, value.getCommitId(), value.getStatus(), value.getParticipatingRegions(), true);
-               }
-               else {
-                  putSingleRecord(transid, value.getCommitId(), value.getStatus(), value.getParticipatingRegions(), false);
-               }
-            }
-         }
-         catch (Exception ex) {
-            LOG.error("formatRecord Exception " + ex);
-            ex.printStackTrace();
-            throw ex;
-         }
-        }
-      } catch (ConcurrentModificationException cme){
-          LOG.info("writeControlPointRecords ConcurrentModificationException;  delaying control point ");
-          // Return the current value rather than incrementing this interval.
-          controlPointDeferred = true;
-          return tLogControlPoint.getCurrControlPt() - 1;
-      } 
-
-      endTime = System.nanoTime();
-      if (LOG.isDebugEnabled()) LOG.debug("TLog Control Point Write Report\n" + 
-                   "                        Total records: " 
-                       +  map.size() + " in " + cpWrites + " write operations\n" +
-                   "                        Write time: " + (endTime - startTime) / 1000 + " microseconds\n" );
-  
-      if (LOG.isTraceEnabled()) LOG.trace("writeControlPointRecords exit ");
-      return -1L;
-
-   }
-
-
-   public long addControlPoint (final Map<Long, TransactionState> map) throws IOException, Exception {
-      if (LOG.isTraceEnabled()) LOG.trace("addControlPoint start with map size " + map.size());
-      long lvCtrlPt = 0L;
-      long agedAsn;  // Writes older than this audit seq num will be deleted
-      long lvAsn;    // local copy of the asn
-      long key;
-      boolean success = false;
-
-      if (controlPointDeferred) {
-         // We deferred the control point once already due to concurrency.  We'll synchronize this timeIndex
-         synchronized (map) {
-            if (LOG.isTraceEnabled()) LOG.trace("Writing synchronized control point records");
-            lvAsn = writeControlPointRecords(map);
-         }
-
-         controlPointDeferred = false;
-      }
-      else {
-         lvAsn = writeControlPointRecords(map);
-         if (lvAsn != -1L){
-            return lvAsn;
-         }
-      }
-
-      try {
-         lvAsn = asn.getAndIncrement();
-
-         // Write the control point interval and the ASN to the control point table
-         lvCtrlPt = tLogControlPoint.doControlPoint(lvAsn); 
-
-         if ((lvCtrlPt - 5) > 0){  // We'll keep 5 control points of audit
-            try {
-               agedAsn = tLogControlPoint.getRecord(String.valueOf(lvCtrlPt - 5));
-               if ((agedAsn > 0) && (lvCtrlPt % 5 == 0)){
-                  try {
-                     if (LOG.isTraceEnabled()) LOG.trace("Attempting to remove TLOG writes older than asn " + agedAsn);
-                     deleteAgedEntries(agedAsn);
-                  }
-                  catch (Exception e){
-                     LOG.error("deleteAgedEntries Exception " + e);
-                     throw e;
-                  }
-               }
-               try {
-                  tLogControlPoint.deleteAgedRecords(lvCtrlPt - 5);
-               }
-               catch (Exception e){
-                  if (LOG.isDebugEnabled()) LOG.debug("addControlPoint - control point record not found ");
-               }
-            }
-            catch (IOException e){
-               LOG.error("addControlPoint IOException");
-               e.printStackTrace();
-               throw e;
-            }
-         }
-      } catch (Exception e){
-          LOG.error("addControlPoint Exception " + e);
-          e.printStackTrace();
-          throw e;
-      }
-      if (LOG.isTraceEnabled()) LOG.trace("addControlPoint returning " + lvCtrlPt);
-      return lvCtrlPt;
-   } 
-
-   public void getTransactionState (TransactionState ts) throws IOException {
-      if (LOG.isTraceEnabled()) LOG.trace("getTransactionState start; transid: " + ts.getTransactionId());
-
-      // This request might be for a transaction not originating on this node, so we need to open
-      // the appropriate Tlog
-      HTableInterface unknownTransactionTable;
-      long lvTransid = ts.getTransactionId();
-      int lv_ownerNid = (int)(lvTransid >> 32);
-      int lv_lockIndex = (int)(lvTransid & tLogHashKey);
-      String lv_tLogName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(lv_ownerNid) + "_LOG_" + Integer.toHexString(lv_lockIndex));
-      HConnection unknownTableConnection = HConnectionManager.createConnection(this.config);
-      unknownTransactionTable = unknownTableConnection.getTable(TableName.valueOf(lv_tLogName));
-
-      try {
-         String transidString = new String(String.valueOf(lvTransid));
-         Get g;
-         long key = (((lvTransid & tLogHashKey) << tLogHashShiftFactor) + (lvTransid & 0xFFFFFFFF));
-         if (LOG.isTraceEnabled()) LOG.trace("key: " + key + ", hexkey: " + Long.toHexString(key) + ", transid: " +  lvTransid);
-         g = new Get(Bytes.toBytes(key));
-         TransState lvTxState = TransState.STATE_NOTX;
-         String stateString = "";
-         String transidToken = "";
-         String commitIdToken = "";
-         try {
-            Result r = unknownTransactionTable.get(g);
-            if (r == null) {
-               if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: tLog result is null: " + transidString);
-            }
-            if (r.isEmpty()) {
-               if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: tLog empty result: " + transidString);
-            }
-            byte [] value = r.getValue(TLOG_FAMILY, ASN_STATE);
-            if (value == null) {
-               ts.setStatus(TransState.STATE_NOTX);
-               if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: tLog value is null: " + transidString);
-               return;
-            }
-            if (value.length == 0) {
-               ts.setStatus(TransState.STATE_NOTX);
-               if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: tLog transaction not found: " + transidString);
-               return;
-            }
-            ts.clearParticipatingRegions();
-            String recordString =  new String (Bytes.toString(value));
-            StringTokenizer st = new StringTokenizer(recordString, ",");
-            if (st.hasMoreElements()) {
-               String asnToken = st.nextElement().toString();
-               transidToken = st.nextElement().toString();
-               stateString = st.nextElement().toString();
-               if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: transaction: " + transidToken + " stateString is: " + stateString);
-            }
-            if (stateString.compareTo("COMMITTED") == 0){
-               lvTxState = TransState.STATE_COMMITTED;
-            }
-            else if (stateString.compareTo("ABORTED") == 0){
-               lvTxState = TransState.STATE_ABORTED;
-            }
-            else if (stateString.compareTo("ACTIVE") == 0){
-               lvTxState = TransState.STATE_ACTIVE;
-            }
-            else if (stateString.compareTo("PREPARED") == 0){
-               lvTxState = TransState.STATE_PREPARED;
-            }
-            else if (stateString.compareTo("NOTX") == 0){
-               lvTxState = TransState.STATE_NOTX;
-            }
-            else if (stateString.compareTo("FORGOTTEN") == 0){
-               // Need to get the previous state record so we know how to drive the regions
-               String keyS = new String(r.getRow());
-               Get get = new Get(r.getRow());
-               get.setMaxVersions(versions);  // will return last n versions of row
-               Result lvResult = unknownTransactionTable.get(get);
-               // byte[] b = lvResult.getValue(TLOG_FAMILY, ASN_STATE);  // returns current version of value
-               List<Cell> list = lvResult.getColumnCells(TLOG_FAMILY, ASN_STATE);  // returns all versions of this column
-               for (Cell element : list) {
-                  String stringValue = new String(CellUtil.cloneValue(element));
-                  st = new StringTokenizer(stringValue, ",");
-                  if (st.hasMoreElements()) {
-                     if (LOG.isTraceEnabled()) LOG.trace("Performing secondary search on (" + transidToken + ")");
-                     String asnToken = st.nextElement().toString() ;
-                     transidToken = st.nextElement().toString() ;
-                     String stateToken = st.nextElement().toString() ;
-                     if ((stateToken.compareTo("COMMITTED") == 0) || (stateToken.compareTo("ABORTED") == 0)) {
-                         String rowKey = new String(r.getRow());
-                         if (LOG.isTraceEnabled()) LOG.trace("Secondary search found record for (" + transidToken + ") with state: " + stateToken);
-                         lvTxState = (stateToken.compareTo("COMMITTED") == 0 ) ? TransState.STATE_COMMITTED : TransState.STATE_ABORTED;
-                         break;
-                     }
-                     else {
-                         if (LOG.isTraceEnabled()) LOG.trace("Secondary search skipping entry for (" + 
-                                    transidToken + ") with state: " + stateToken );
-                     }
-                  }
-               }
-            }
-            else if (stateString.compareTo("ABORTING") == 0){
-               lvTxState = TransState.STATE_ABORTING;
-            }
-            else if (stateString.compareTo("COMMITTING") == 0){
-               lvTxState = TransState.STATE_COMMITTING;
-            }
-            else if (stateString.compareTo("PREPARING") == 0){
-               lvTxState = TransState.STATE_PREPARING;
-            }
-            else if (stateString.compareTo("FORGETTING") == 0){
-               lvTxState = TransState.STATE_FORGETTING;
-            }
-            else if (stateString.compareTo("FORGETTING_HEUR") == 0){
-               lvTxState = TransState.STATE_FORGETTING_HEUR;
-            }
-            else if (stateString.compareTo("BEGINNING") == 0){
-               lvTxState = TransState.STATE_BEGINNING;
-            }
-            else if (stateString.compareTo("HUNGCOMMITTED") == 0){
-               lvTxState = TransState.STATE_HUNGCOMMITTED;
-            }
-            else if (stateString.compareTo("HUNGABORTED") == 0){
-               lvTxState = TransState.STATE_HUNGABORTED;
-            }
-            else if (stateString.compareTo("IDLE") == 0){
-               lvTxState = TransState.STATE_IDLE;
-            }
-            else if (stateString.compareTo("FORGOTTEN_HEUR") == 0){
-               lvTxState = TransState.STATE_FORGOTTEN_HEUR;
-            }
-            else if (stateString.compareTo("ABORTING_PART2") == 0){
-               lvTxState = TransState.STATE_ABORTING_PART2;
-            }
-            else if (stateString.compareTo("TERMINATING") == 0){
-               lvTxState = TransState.STATE_TERMINATING;
-            }
-            else {
-               lvTxState = TransState.STATE_BAD;
-            }
-
-            // get past the filler
-            st.nextElement();
-
-            commitIdToken = st.nextElement().toString();
-            ts.setCommitId(Long.parseLong(commitIdToken));
-
-            // Load the TransactionState object up with regions
-            while (st.hasMoreElements()) {
-               String tableNameToken = st.nextToken();
-               HTable table = new HTable(config, tableNameToken);
-               NavigableMap<HRegionInfo, ServerName> regions = table.getRegionLocations();
-               Iterator<Map.Entry<HRegionInfo, ServerName>> it =  regions.entrySet().iterator();
-               while(it.hasNext()) { // iterate entries.
-                  NavigableMap.Entry<HRegionInfo, ServerName> pairs = it.next();
-                  HRegionInfo regionKey = pairs.getKey();
-                  if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: transaction: " + transidToken + " adding region: " + regionKey.getRegionNameAsString());
-                  ServerName serverValue = regions.get(regionKey);
-                  String hostAndPort = new String(serverValue.getHostAndPort());
-                  StringTokenizer tok = new StringTokenizer(hostAndPort, ":");
-                  String hostName = new String(tok.nextElement().toString());
-                  int portNumber = Integer.parseInt(tok.nextElement().toString());
-                  TransactionRegionLocation loc = new TransactionRegionLocation(regionKey, serverValue);
-                  ts.addRegion(loc);
-              }
-            }
-            ts.setStatus(lvTxState);
-
-            if (LOG.isTraceEnabled()) LOG.trace("getTransactionState: returning transid: " + ts.getTransactionId() + " state: " + lvTxState);
-         } catch (Exception e){
-             LOG.error("getTransactionState Exception " + Arrays.toString(e.getStackTrace()));
-             throw e;
-         }
-      }
-      catch (Exception e2) {
-            LOG.error("getTransactionState Exception2 " + e2);
-            e2.printStackTrace();
-      }
-      if (LOG.isTraceEnabled()) LOG.trace("getTransactionState end transid: " + ts.getTransactionId());
-      return;
-   } 
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/TrafInfo.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/TrafInfo.java b/core/sqf/src/seatrans/tm/hbasetmlib2/TrafInfo.java
deleted file mode 100644
index ab1305b..0000000
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/TrafInfo.java
+++ /dev/null
@@ -1,204 +0,0 @@
-// @@@ START COPYRIGHT @@@
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-// @@@ END COPYRIGHT @@@
-
-package org.trafodion.dtm;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HConnectionManager;
-//H98import org.apache.hadoop.hbase.ipc.HMasterInterface;
-//H98import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
-//H98import org.apache.hadoop.hbase.ipc.HRegionInterface;
-
-
-public class TrafInfo {
-
-    private HBaseAdmin hbadmin;
-    private HConnection connection;
-    Configuration     config;
-    //    HMasterInterface  hmaster;
-
-    public TrafInfo() throws IOException {
-        init();
-    }
-
-    public void init() throws IOException {
-        this.config = HBaseConfiguration.create();
-        this.connection = HConnectionManager.createConnection(config);
-
-        try {
-            hbadmin = new HBaseAdmin(config);
-	    //H98            hmaster = hbadmin.getMaster();
-        } catch(Exception e) {
-            System.out.println("ERROR: Unable to obtain HBase accessors, Exiting");
-            e.printStackTrace();
-            System.exit(1);
-        }
-    }
-
-    public static void printHelp() {
-        System.out.println("Run: $JAVA_HOME/bin/java org.trafodion.dtm.TrafInfo <command>");
-        System.out.println("Commands to gather Transactional Region information:");
-        System.out.println("    active      ::  active transactions per region");
-        System.out.println("    committed   ::  committed transactions per region by sequence number");
-        System.out.println("    indoubt     ::  in-doubt transactions per region");
-        System.out.println("    <command> -v::  shows metadata tables");
-    }
-
-    public void getActivePendingTrans(String transType, String showmd){
-        System.out.println("\n====================================================================");
-        System.out.println("\t\tActive Pending Transactions");
-        getTransactions(transType, showmd);
-    }
-
-    public void getCommittedTransactions(String transType, String showmd){
-        System.out.println("\n====================================================================");
-        System.out.println("\t\tCommitted Transactions by Sequence Number");
-        getTransactions(transType, showmd);
-    }
-
-    public void getInDoubtTransactions(String transType, String showmd){
-        System.out.println("\n====================================================================");
-        System.out.println("\t\tIn-Doubt Transactions");
-        getTransactions(transType, showmd);
-    }
-
-    public void getTransactions(String transType, String showmd){
-        String regionName, tableName;
-        int idx;
-
-	/* H98
-        Collection<ServerName> sn = hmaster.getClusterStatus().getServers();
-        for(ServerName sname : sn) {
-            System.out.println("===================================================================="
-                             + "\nServer Name: " + sname.toString() + "\n"
-                             + "\nTransId    RegionId             TableName");
-
-            try {
-
-                HRegionInterface regionServer = connection.getHRegionConnection(sname.getHostname(), sname.getPort());
-                List<HRegionInfo> regions = regionServer.getOnlineRegions();
-                connection.close();
-
-                TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface)this.connection
-                                                            .getHRegionConnection(sname.getHostname(), sname.getPort());
-
-                for (HRegionInfo rinfo: regions) {
-
-                    regionName = rinfo.getRegionNameAsString();
-                    idx = regionName.indexOf(',');
-                    tableName = regionName.substring(0, idx);
-
-                    if(!showmd.contains("-v")){
-                        if((tableName.contains("TRAFODION._MD_.")) || (tableName.contains("-ROOT-")) ||
-                           (tableName.equals(".META."))){
-                            continue;
-                        }
-                    }
-                    if(tableName.contains("TRAFODION._DTM_.")){
-                        continue;
-                    }
-
-                    System.out.println("--------------------------------------------------------------------"
-                                     + "\n\t   " + rinfo.getRegionId()
-                                     + "\t"      + tableName);
-
-                    if(transType.equals("active")){
-                        List<Long> result = transactionalRegionServer.getPendingTrans(rinfo.getRegionName());
-                        for(Long res : result)
-                            System.out.println(res);
-                    }
-                    else if(transType.equals("committed")){
-                        List<Long> result = transactionalRegionServer.getCommittedTrans(rinfo.getRegionName());
-                        for(Long res : result)
-                            System.out.println(res);
-                    }
-                    else if(transType.equals("indoubt")){
-                        List<Long> result = transactionalRegionServer.getInDoubtTrans(rinfo.getRegionName());
-                            for(Long res : result)
-                            System.out.println(res);
-                    }
-                }
-
-            } catch(Exception e) {
-                System.out.println("ERROR: Unable to get region info, Exiting");
-                e.printStackTrace();
-                System.exit(1);
-            }
-        }
-	*/
-	
-
-    }
-
-    public static void main(String[] args) throws IOException {
-
-        if(args.length == 0) {
-            TrafInfo.printHelp();
-            System.exit(0);
-        }
-
-        TrafInfo ti = new TrafInfo();
-
-        if(args.length == 1){
-            if(args[0].equals("help"))
-                TrafInfo.printHelp();
-            else if(args[0].equals("active"))
-                ti.getActivePendingTrans(args[0], "");
-            else if(args[0].equals("committed"))
-                ti.getCommittedTransactions(args[0], "");
-            else if(args[0].equals("indoubt"))
-                ti.getInDoubtTransactions(args[0], "");
-            else {
-                TrafInfo.printHelp();
-                System.exit(0);
-            }
-        }
-        // Verbose shows Metadata tables
-        else if(args.length == 2){
-            if(args[0].equals("active") && args[1].equals("-v"))
-                ti.getActivePendingTrans(args[0], args[1]);
-            else if(args[0].equals("committed") && args[1].equals("-v"))
-                ti.getCommittedTransactions(args[0], args[1]);
-            else if(args[0].equals("indoubt") && args[1].equals("-v"))
-                ti.getInDoubtTransactions(args[0], args[1]);
-            else {
-                TrafInfo.printHelp();
-                System.exit(0);
-            }
-        }
-        else {
-                TrafInfo.printHelp();
-                System.exit(0);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/pom.xml
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/pom.xml b/core/sqf/src/seatrans/tm/hbasetmlib2/pom.xml
new file mode 100644
index 0000000..17f1fba
--- /dev/null
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+  <repositories>
+    <repository>
+      <id>cloudera</id>
+      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+    </repository>
+  </repositories>
+
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <hbase.version>0.98.1-cdh5.1.0</hbase.version>
+    <hbase-trx.id>hbase-trx-cdh5_3</hbase-trx.id>
+    <java.version>1.7</java.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <version>${hbase.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase.client.transactional</groupId>
+      <artifactId>${hbase-trx.id}</artifactId>
+      <version>${env.TRAFODION_VER}</version>
+      <scope>system</scope>
+      <systemPath>${env.MY_SQROOT}/export/lib/${hbase-trx.id}-${env.TRAFODION_VER}.jar</systemPath>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>1.7.7</version>
+    </dependency>
+  </dependencies>
+
+  <groupId>org.trafodion.sql</groupId>
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>trafodion-dtm</artifactId>
+  <version>${env.TRAFODION_VER}</version>
+  <name>trafodion-dtm</name>
+  <description>Java code for the Trafodion DTM (Distributed Transaction Manager)</description>
+
+  <build>
+    <plugins>
+      <plugin>
+            <groupId>com.github.koraktor</groupId>
+            <artifactId>mavanagaiata</artifactId>
+            <version>0.7.2</version>
+            <configuration>
+                  <dirtyFlag>false</dirtyFlag>
+                  <dirtyIgnoreUntracked>false</dirtyIgnoreUntracked>
+                  <gitDir>${env.MY_SQROOT}/../../.git</gitDir>
+                  <dateFormat>ddMMMyyyy</dateFormat>
+            </configuration>
+            <executions>
+                  <execution>
+                        <id>git-commit</id>
+                        <phase>validate</phase>
+                        <goals>
+                              <goal>commit</goal>
+                              <goal>branch</goal>
+                        </goals>
+                  </execution>
+            </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+              <manifestEntries>
+                    <Implementation-Version-1>Version ${project.version}</Implementation-Version-1>
+                    <Implementation-Version-2>Release ${project.version}</Implementation-Version-2>
+                    <Implementation-Version-3>Build ${env.SQ_BUILD_TYPE}</Implementation-Version-3>
+                    <Implementation-Version-4>[${user.name}]</Implementation-Version-4>
+                    <Implementation-Version-5>Branch ${mvngit.commit.abbrev}-${mvngit.branch}</Implementation-Version-5>
+                    <Implementation-Version-6>Date ${maven.build.timestamp}</Implementation-Version-6>
+                    <Product-Name>${project.name}</Product-Name>
+              </manifestEntries>
+          </archive>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
new file mode 100644
index 0000000..11120ef
--- /dev/null
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
@@ -0,0 +1,416 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.dtm;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
+import org.apache.hadoop.hbase.client.transactional.TransactionRegionLocation;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import java.lang.NullPointerException;
+
+public class HBaseAuditControlPoint {
+
+    static final Log LOG = LogFactory.getLog(HBaseAuditControlPoint.class);
+    private static long currControlPt;
+    private static HBaseAdmin admin;
+    private Configuration config;
+    private static String CONTROL_POINT_TABLE_NAME;
+    private static final byte[] CONTROL_POINT_FAMILY = Bytes.toBytes("cpf");
+    private static final byte[] ASN_HIGH_WATER_MARK = Bytes.toBytes("hwm");
+    private static HTable table;
+    private boolean useAutoFlush;
+    private boolean disableBlockCache;
+
+    public HBaseAuditControlPoint(Configuration config) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("Enter HBaseAuditControlPoint constructor()");
+      this.config = config;
+      CONTROL_POINT_TABLE_NAME = config.get("CONTROL_POINT_TABLE_NAME");
+      HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(CONTROL_POINT_TABLE_NAME));
+      HColumnDescriptor hcol = new HColumnDescriptor(CONTROL_POINT_FAMILY);
+
+      disableBlockCache = false;
+      try {
+         String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
+         if (blockCacheString != null){
+            disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
+            if (LOG.isDebugEnabled()) LOG.debug("disableBlockCache != null");
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_DISABLE_BLOCK_CACHE is not in ms.env");
+      }
+      LOG.info("disableBlockCache is " + disableBlockCache);
+      if (disableBlockCache) {
+         hcol.setBlockCacheEnabled(false);
+      }
+
+      desc.addFamily(hcol);
+      admin = new HBaseAdmin(config);
+
+      useAutoFlush = true;
+      try {
+         String autoFlush = System.getenv("TM_TLOG_AUTO_FLUSH");
+         if (autoFlush != null){
+            useAutoFlush = (Integer.parseInt(autoFlush) != 0);
+            if (LOG.isDebugEnabled()) LOG.debug("autoFlush != null");
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_AUTO_FLUSH is not in ms.env");
+      }
+      LOG.info("useAutoFlush is " + useAutoFlush);
+
+      boolean lvControlPointExists = admin.tableExists(CONTROL_POINT_TABLE_NAME);
+      if (LOG.isDebugEnabled()) LOG.debug("HBaseAuditControlPoint lvControlPointExists " + lvControlPointExists);
+      currControlPt = -1;
+      if (lvControlPointExists == false) {
+         try {
+            if (LOG.isDebugEnabled()) LOG.debug("Creating the table " + CONTROL_POINT_TABLE_NAME);
+            admin.createTable(desc);
+            currControlPt = 1;
+         }
+         catch (TableExistsException e) {
+            LOG.error("Table " + CONTROL_POINT_TABLE_NAME + " already exists");
+         }
+      }
+      try {
+         if (LOG.isDebugEnabled()) LOG.debug("try new HTable");
+         table = new HTable(config, desc.getName());
+         table.setAutoFlushTo(this.useAutoFlush);
+      }
+      catch (IOException e) {
+         LOG.error("new HTable IOException");
+      }
+
+      if (currControlPt == -1){
+         try {
+            currControlPt = getCurrControlPt();
+         }
+         catch (Exception e2) {
+            if (LOG.isDebugEnabled()) LOG.debug("Exit getCurrControlPoint() exception " + e2);
+         }
+      }
+      if (LOG.isDebugEnabled()) LOG.debug("currControlPt is " + currControlPt);
+
+      if (LOG.isTraceEnabled()) LOG.trace("Exit constructor()");
+      return;
+    }
+
+   public long getCurrControlPt() throws Exception {
+      if (LOG.isTraceEnabled()) LOG.trace("getCurrControlPt:  start");
+      long highKey = -1;
+      if (LOG.isDebugEnabled()) LOG.debug("new Scan");
+      Scan s = new Scan();
+      s.setCaching(10);
+      s.setCacheBlocks(false);
+      if (LOG.isDebugEnabled()) LOG.debug("resultScanner");
+      ResultScanner ss = table.getScanner(s);
+      try {
+         long currKey;
+         String rowKey;
+         if (LOG.isDebugEnabled()) LOG.debug("entering for loop" );
+         for (Result r : ss) {
+            rowKey = new String(r.getRow());
+            if (LOG.isDebugEnabled()) LOG.debug("rowKey is " + rowKey );
+            currKey = Long.parseLong(rowKey);
+            if (LOG.isDebugEnabled()) LOG.debug("value is " + Long.parseLong(Bytes.toString(r.value())));
+            if (currKey > highKey) {
+               if (LOG.isDebugEnabled()) LOG.debug("Setting highKey to " + currKey);
+               highKey = currKey;
+            }
+         }
+      }
+      catch (Exception e) {
+        LOG.error("getCurrControlPt IOException" + e);
+        e.printStackTrace();
+      } finally {
+         ss.close();
+      }
+      if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt returning " + highKey);
+      return highKey;
+   }
+
+   public long putRecord(final long ControlPt, final long startingSequenceNumber) throws Exception {
+      if (LOG.isTraceEnabled()) LOG.trace("putRecord starting sequence number ("  + String.valueOf(startingSequenceNumber) + ")");
+      String controlPtString = new String(String.valueOf(ControlPt));
+      Put p = new Put(Bytes.toBytes(controlPtString));
+      p.add(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK, Bytes.toBytes(String.valueOf(startingSequenceNumber)));
+      try {
+         if (LOG.isTraceEnabled()) LOG.trace("try table.put with starting sequence number " + startingSequenceNumber);
+         table.put(p);
+         if (useAutoFlush == false) {
+            if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record");
+            table.flushCommits();
+         }
+      }
+      catch (Exception e) {
+         LOG.error("HBaseAuditControlPoint:putRecord Exception" + e);
+         throw e;
+      }
+      if (LOG.isTraceEnabled()) LOG.trace("HBaseAuditControlPoint:putRecord returning " + ControlPt);
+      return ControlPt;
+   }
+
+   public ArrayList<String> getRecordList(String controlPt) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("getRecord");
+      ArrayList<String> transactionList = new ArrayList<String>();
+      Get g = new Get(Bytes.toBytes(controlPt));
+      Result r = table.get(g);
+      byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
+      String recordString = new String(currValue);
+      if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
+      StringTokenizer st = new StringTokenizer(recordString, ",");
+      while (st.hasMoreElements()) {
+        String token = st.nextElement().toString() ;
+        if (LOG.isDebugEnabled()) LOG.debug("token is " + token);
+        transactionList.add(token);
+      }
+
+      if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit with list size (" + transactionList.size() + ")");
+      return transactionList;
+
+    }
+
+   public long getRecord(final String controlPt) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("getRecord " + controlPt);
+      long lvValue = -1;
+      Get g = new Get(Bytes.toBytes(controlPt));
+      String recordString;
+      try {
+         Result r = table.get(g);
+         byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
+         try {
+            recordString = new String (Bytes.toString(currValue));
+            if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
+            lvValue = Long.parseLong(recordString, 10);
+         }
+         catch (NullPointerException e){
+            if (LOG.isDebugEnabled()) LOG.debug("control point " + controlPt + " is not in the table");
+         }
+      }
+      catch (IOException e){
+          LOG.error("getRecord IOException");
+          throw e;
+      }
+      if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit " + lvValue);
+      return lvValue;
+
+    }
+
+   public long getStartingAuditSeqNum() throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum");
+      String controlPtString = new String(String.valueOf(currControlPt));
+      long lvAsn;
+      if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum new get for control point " + currControlPt);
+      Get g = new Get(Bytes.toBytes(controlPtString));
+      if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum setting result");
+      Result r = table.get(g);
+      if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum currValue CONTROL_POINT_FAMILY is "
+                 + CONTROL_POINT_FAMILY + " ASN_HIGH_WATER_MARK " + ASN_HIGH_WATER_MARK);
+      byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
+      if (LOG.isDebugEnabled()) LOG.debug("Starting asn setting recordString ");
+      String recordString = "";
+      try {
+         recordString = new String(currValue);
+      }
+      catch (NullPointerException e) {
+         if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum recordString is null");
+         lvAsn = 1;
+         if (LOG.isDebugEnabled()) LOG.debug("Starting asn is 1");
+         return lvAsn;
+      }
+      if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum recordString is good");
+      if (LOG.isDebugEnabled()) LOG.debug("Starting asn for control point " + currControlPt + " is " + recordString);
+      lvAsn = Long.valueOf(recordString);
+      if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum - exit returning " + lvAsn);
+      return lvAsn;
+    }
+
+   public long getNextAuditSeqNum(int nid) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum for node: " + nid);
+
+      // We need to open the appropriate control point table and read the value from it
+      HTableInterface remoteTable;
+      String lv_tName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(nid) + "_CONTROL_POINT");
+      HConnection remoteConnection = HConnectionManager.createConnection(this.config);
+      remoteTable = remoteConnection.getTable(TableName.valueOf(lv_tName));
+
+      long highValue = -1;
+      try {
+         Scan s = new Scan();
+         s.setCaching(10);
+         s.setCacheBlocks(false);
+         if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum resultScanner");
+         ResultScanner ss = remoteTable.getScanner(s);
+         try {
+            long currValue;
+            String rowKey;
+            if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum entering for loop" );
+            for (Result r : ss) {
+               rowKey = new String(r.getRow());
+               if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum rowKey is " + rowKey );
+               currValue =  Long.parseLong(Bytes.toString(r.value()));
+               if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum value is " + currValue);
+               if (currValue > highValue) {
+                  if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum Setting highValue to " + currValue);
+                  highValue = currValue;
+               }
+            }
+         }
+         catch (Exception e) {
+           LOG.error("getNextAuditSeqNum IOException" + e);
+           e.printStackTrace();
+         } finally {
+            ss.close();
+         }
+      }
+      catch (IOException e) {
+         LOG.error("getNextAuditSeqNum IOException setting up scan for " + lv_tName);
+         e.printStackTrace();
+      }
+      finally {
+         try {
+            remoteTable.close();
+            remoteConnection.close();
+         }
+         catch (IOException e) {
+            LOG.error("getNextAuditSeqNum IOException closing table or connection for " + lv_tName);
+            e.printStackTrace();
+         }
+      }
+      if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum returning " + (highValue + 1));
+      return (highValue + 1);
+    }
+
+
+   public long doControlPoint(final long sequenceNumber) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("doControlPoint start");
+      try {
+         currControlPt++;
+         if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), sequenceNumber (" + sequenceNumber+ ") try putRecord");
+         putRecord(currControlPt, sequenceNumber);
+      }
+      catch (Exception e) {
+         LOG.error("doControlPoint Exception" + e);
+      }
+
+      if (LOG.isTraceEnabled()) LOG.trace("doControlPoint - exit");
+      return currControlPt;
+   }
+
+   public boolean deleteRecord(final long controlPoint) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("deleteRecord start for control point " + controlPoint);
+      String controlPtString = new String(String.valueOf(controlPoint));
+
+      try {
+         List<Delete> list = new ArrayList<Delete>();
+         Delete del = new Delete(Bytes.toBytes(controlPtString));
+         if (LOG.isDebugEnabled()) LOG.debug("deleteRecord  (" + controlPtString + ") ");
+         table.delete(del);
+      }
+      catch (Exception e) {
+         LOG.error("deleteRecord IOException");
+      }
+
+      if (LOG.isTraceEnabled()) LOG.trace("deleteRecord - exit");
+      return true;
+   }
+
+   public boolean deleteAgedRecords(final long controlPoint) throws IOException {
+      if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords start - control point " + controlPoint);
+      String controlPtString = new String(String.valueOf(controlPoint));
+
+      Scan s = new Scan();
+      s.setCaching(10);
+      s.setCacheBlocks(false);
+      ArrayList<Delete> deleteList = new ArrayList<Delete>();
+      ResultScanner ss = table.getScanner(s);
+      try {
+         String rowKey;
+         for (Result r : ss) {
+            rowKey = new String(r.getRow());
+            if (Long.parseLong(rowKey) < controlPoint) {
+               if (LOG.isDebugEnabled()) LOG.debug("Adding  (" + rowKey + ") to delete list");
+               Delete del = new Delete(rowKey.getBytes());
+               deleteList.add(del);
+            }
+         }
+         if (LOG.isDebugEnabled()) LOG.debug("attempting to delete list with " + deleteList.size() + " elements");
+         table.delete(deleteList);
+      }
+      catch (Exception e) {
+         LOG.error("deleteAgedRecords IOException");
+      }finally {
+         ss.close();
+      }
+
+      if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords - exit");
+      return true;
+   }
+}
+