You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by sa...@apache.org on 2017/05/12 20:25:38 UTC

[03/22] incubator-trafodion git commit: This is a large contribution of changes from Esgyn TransactionManager and libraries that are collectively much better tested and hardened than Trafodion, but are too numerous and complex to cherry pick individually

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index d7be3ea..21ca319 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -94,6 +94,7 @@ public class HBaseTxClient {
    private int stallWhere;
    private IdTm idServer;
    private static final int ID_TM_SERVER_TIMEOUT = 1000;
+   private static int myClusterId = 0;
 
    public enum AlgorithmType{
      MVCC, SSCC
@@ -180,7 +181,7 @@ public class HBaseTxClient {
       try {
         trxManager = TransactionManager.getInstance(config, connection);
       } catch (IOException e ){
-          LOG.error("Unable to create TransactionManager, throwing exception", e);
+          LOG.error("trxManager Initialization failure throwing exception", e);
           throw e;
       }
 
@@ -300,7 +301,9 @@ public class HBaseTxClient {
                                            this,
                                            useForgotten,
                                            forceForgotten,
-                                           useTlog);
+                                           useTlog,
+                                           false,
+                                           false);
           recovThread.start();
       }
       if (LOG.isTraceEnabled()) LOG.trace("Exit init()");
@@ -328,7 +331,9 @@ public class HBaseTxClient {
                                                    this,
                                                    useForgotten,
                                                    forceForgotten,
-                                                   useTlog);
+                                                   useTlog,
+                                                   false,
+                                                   true);
                newRecovThread.start();
                mapRecoveryThreads.put(nodeID, recovThread);
                if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- mapRecoveryThreads size: " + mapRecoveryThreads.size());
@@ -366,6 +371,10 @@ public class HBaseTxClient {
       return TransReturnCode.RET_OK.getShort();
    }
 
+   public static Map<Long, TransactionState> getMap() {
+     return mapTransactionStates;
+   }
+
    public long beginTransaction(final long transactionId) throws IOException {
 
       if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransaction, txid: " + transactionId);
@@ -383,11 +392,11 @@ public class HBaseTxClient {
          if (tx2 != null) {
             // Some other thread added the transaction while we were creating one.  It's already in the
             // map, so we can use the existing one.
-            if (LOG.isDebugEnabled()) LOG.debug("HBaseTxClient:beginTransaction, found TransactionState object while creating a new one " + tx2);
+            if (LOG.isDebugEnabled()) LOG.debug("beginTransaction, found TransactionState object while creating a new one " + tx2);
             tx = tx2;
          }
          else {
-            if (LOG.isDebugEnabled()) LOG.debug("HBaseTxClient:beginTransaction, adding new TransactionState to map " + tx);
+            if (LOG.isDebugEnabled()) LOG.debug("beginTransaction, adding new TransactionState to map " + tx);
             mapTransactionStates.put(transactionId, tx);
          }
       }
@@ -397,7 +406,7 @@ public class HBaseTxClient {
    }
 
    public short abortTransaction(final long transactionID) throws IOException {
-      if (LOG.isDebugEnabled()) LOG.debug("Enter abortTransaction, txid: " + transactionID);
+      if (LOG.isDebugEnabled()) LOG.debug("Enter abortTransaction, txId: " + transactionID);
       TransactionState ts = mapTransactionStates.get(transactionID);
 
       if(ts == null) {
@@ -408,10 +417,11 @@ public class HBaseTxClient {
       try {
          ts.setStatus(TransState.STATE_ABORTED);
          if (useTlog) {
-            tLog.putSingleRecord(transactionID, -1, "ABORTED", ts.getParticipatingRegions(), false);
+            tLog.putSingleRecord(transactionID, ts.getStartId(), -1, TransState.STATE_ABORTED.toString(), ts.getParticipatingRegions(), ts.hasPlaceHolder(), false); //force flush
          }
-      } catch(IOException e) {
-         LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION", e);
+      }
+      catch(IOException e) {
+         LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION ", e);
          return TransReturnCode.RET_EXCEPTION.getShort();
       }
 
@@ -465,12 +475,7 @@ public class HBaseTxClient {
       }
 
       if (useTlog && useForgotten) {
-         if (forceForgotten) {
-            tLog.putSingleRecord(transactionID, -1, "FORGOTTEN", null, true);
-         }
-         else {
-            tLog.putSingleRecord(transactionID, -1, "FORGOTTEN", null, false);
-         }
+            tLog.putSingleRecord(transactionID, ts.getStartId(), -1, TransState.STATE_FORGOTTEN_ABORT.toString(), ts.getParticipatingRegions(), ts.hasPlaceHolder(), forceForgotten); // forced flush?
       }
  //     mapTransactionStates.remove(transactionID);
 
@@ -481,11 +486,17 @@ public class HBaseTxClient {
    public short prepareCommit(long transactionId) throws 
                                                  TransactionManagerException,
                                                  IOException{
-     if (LOG.isDebugEnabled()) LOG.debug("Enter prepareCommit, txid: " + transactionId);
-     if (LOG.isTraceEnabled()) LOG.trace("mapTransactionStates " + mapTransactionStates + " entries " + mapTransactionStates.size());
-        TransactionState ts = mapTransactionStates.get(transactionId);
-     if(ts == null) {
-       LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOTX.toString());
+     if (LOG.isDebugEnabled()) LOG.debug("Enter prepareCommit"
+					 + ", txId: " + transactionId
+					 + ", #txstate entries " + mapTransactionStates.size()
+					 );
+     TransactionState ts = mapTransactionStates.get(transactionId);
+     
+     if (ts == null) {
+       LOG.error("Returning from prepareCommit" 
+		 + ", txId: " + transactionId 
+		 + ", retval: " + TransReturnCode.RET_NOTX.toString()
+		 );
        return TransReturnCode.RET_NOTX.getShort();
      }
 
@@ -494,13 +505,13 @@ public class HBaseTxClient {
         if (LOG.isDebugEnabled()) LOG.debug("prepareCommit, [ " + ts + " ], result " + result + ((result == TransactionalReturn.COMMIT_OK_READ_ONLY)?", Read-Only":""));
         switch (result) {
           case TransactionalReturn.COMMIT_OK:
-             if (LOG.isTraceEnabled()) LOG.trace("Exit OK prepareCommit, txid: " + transactionId);
+             if (LOG.isTraceEnabled()) LOG.trace("Exit OK prepareCommit, txId: " + transactionId);
              return TransReturnCode.RET_OK.getShort();
           case TransactionalReturn.COMMIT_OK_READ_ONLY:
              synchronized(mapLock) {
                 mapTransactionStates.remove(transactionId);
              }
-             if (LOG.isTraceEnabled()) LOG.trace("Exit OK_READ_ONLY prepareCommit, txid: " + transactionId);
+             if (LOG.isTraceEnabled()) LOG.trace("Exit OK_READ_ONLY prepareCommit, txId: " + transactionId);
              return TransReturnCode.RET_READONLY.getShort();
           case TransactionalReturn.COMMIT_UNSUCCESSFUL:
              if(!ts.getRecordedException().isEmpty())
@@ -530,28 +541,32 @@ public class HBaseTxClient {
        throw t;
      } 
      catch (CommitUnsuccessfulException e) {
-       LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException", e);
+       LOG.error("Returning from prepareCommit, txId: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException");
        throw new TransactionManagerException(e,
                                    TransReturnCode.RET_NOCOMMITEX.getShort());
      }
      catch (IOException e) {
-       LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException", e);
+       LOG.error("Returning from prepareCommit, txId: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException");
        throw new TransactionManagerException(e,
                                    TransReturnCode.RET_IOEXCEPTION.getShort());
      }
    }
 
-   public short doCommit(long transactionId) throws IOException, CommitUnsuccessfulException {
-       if (LOG.isDebugEnabled()) LOG.debug("Enter doCommit, txid: " + transactionId);
-       TransactionState ts = mapTransactionStates.get(transactionId);
+   public short doCommit(long transactionId) throws IOException {
+      if (LOG.isDebugEnabled()) LOG.debug("Enter doCommit, txId: " + transactionId);
+      TransactionState ts = mapTransactionStates.get(transactionId);
 
-       if(ts == null) {
-      LOG.error("Returning from HBaseTxClient:doCommit, (null tx) retval: " + TransReturnCode.RET_NOTX.toString() + " txid: " + transactionId);
+      if(ts == null) {
+	      LOG.error("Returning from doCommit, (null tx) retval: " 
+			+ TransReturnCode.RET_NOTX.toString() 
+			+ ", txId: " + transactionId
+			);
           return TransReturnCode.RET_NOTX.getShort();
        }
 
        // Set the commitId
        IdTmId commitId = null;
+       long commitIdVal = -1;
        if (TRANSACTION_ALGORITHM == AlgorithmType.SSCC) {
           try {
              commitId = new IdTmId();
@@ -562,24 +577,39 @@ public class HBaseTxClient {
              LOG.error("doCommit: IdTm threw exception " ,  exc);
              throw new CommitUnsuccessfulException("doCommit: IdTm threw exception " ,  exc);
           }
+          commitIdVal = commitId.val;
        }
-
-       final long commitIdVal = (TRANSACTION_ALGORITHM == AlgorithmType.SSCC) ? commitId.val : -1;
        if (LOG.isTraceEnabled()) LOG.trace("doCommit setting commitId (" + commitIdVal + ") for tx: " + ts.getTransactionId());
        ts.setCommitId(commitIdVal);
 
-       try {
-          ts.setStatus(TransState.STATE_COMMITTED);
-          if (useTlog) {
-             tLog.putSingleRecord(transactionId, commitIdVal, "COMMITTED", ts.getParticipatingRegions(), true);
-          }
-       } catch(IOException e) {
-          LOG.error("Returning from HBaseTxClient:doCommit, txid: " + transactionId + " tLog.putRecord: EXCEPTION ", e);
-          return TransReturnCode.RET_EXCEPTION.getShort();
+       if (stallWhere == 4) {
+    	  if (LOG.isInfoEnabled())LOG.info("Stalling in phase 2a (before TLOG write) for doCommit for transaction: " + transactionId);
+          boolean loopBack = false;
+          do
+          {
+             try {
+                loopBack = false;
+                Thread.sleep(600000); // Initially set to run every 5 min
+             } catch (InterruptedException ie) {
+                loopBack = true;
+             }
+          } while (loopBack);
        }
 
+       //try {
+          ts.setStatus(TransState.STATE_COMMITTING);
+          if (useTlog) {
+             try {
+                tLog.putSingleRecord(transactionId, ts.getStartId(), commitIdVal, TransState.STATE_COMMITTED.toString(), ts.getParticipatingRegions(), ts.hasPlaceHolder(), true);
+                ts.setStatus(TransState.STATE_COMMITTED);
+             }
+             catch (IOException e) {
+                 LOG.error("doCommit: Local TLOG write threw exception during commit " , e);
+                 throw new RuntimeException(e);
+             }
+          }
        if ((stallWhere == 2) || (stallWhere == 3)) {
-          LOG.info("Stalling in phase 2 for doCommit");
+    	  if (LOG.isInfoEnabled())LOG.info("Stalling in phase 2 for doCommit for transaction: " + transactionId);
           boolean loopBack = false;
           do {
              try {
@@ -592,13 +622,15 @@ public class HBaseTxClient {
        }
 
        try {
+          if (LOG.isTraceEnabled()) LOG.trace("doCommit, calling trxManager.doCommit(" + ts.getTransactionId() + ")" );
           trxManager.doCommit(ts);
        } catch (CommitUnsuccessfulException e) {
-          LOG.error("Returning from HBaseTxClient:doCommit, retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException" + " txid: " + transactionId, e);
+          LOG.error("Returning from doCommit, transaction: " + transactionId
+        		      + ", retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException");
           return TransReturnCode.RET_EXCEPTION.getShort();
        }
        catch (UnsuccessfulDDLException ddle) {
-          LOG.error("FATAL DDL Exception from HBaseTxClient:doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionId, ddle);
+          LOG.error("FATAL DDL Exception from doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txId: " + transactionId);
 
           //Reaching here means several attempts to perform the DDL operation has failed in commit phase.
           //Generally if only DML operation is involved, returning error causes TM to call completeRequest()
@@ -616,7 +648,9 @@ public class HBaseTxClient {
                    loopBack = false;
                    commitDDLLock.wait();
                 } catch(InterruptedException ie) {
-                    LOG.warn("Interrupting commitDDLLock.wait,  but retrying ", ie);
+                    LOG.warn("Interrupting commitDDLLock.wait" 
+			     + ", txId: " + transactionId
+			     + ", retrying ", ie);
                     loopBack = true;
                 }
              } while (loopBack);
@@ -624,38 +658,37 @@ public class HBaseTxClient {
           return TransReturnCode.RET_EXCEPTION.getShort();
        }
        if (useTlog && useForgotten) {
-          if (forceForgotten) {
-             tLog.putSingleRecord(transactionId, commitIdVal, "FORGOTTEN", null, true);
-          }
-          else {
-             tLog.putSingleRecord(transactionId, commitIdVal, "FORGOTTEN", null, false);
-          }
+          tLog.putSingleRecord(transactionId, ts.getStartId(), commitIdVal, TransState.STATE_FORGOTTEN_COMMITTED.toString(), ts.getParticipatingRegions(), ts.hasPlaceHolder(), forceForgotten); // forced flush?
        }
-//       mapTransactionStates.remove(transactionId);
-
        if (LOG.isTraceEnabled()) LOG.trace("Exit doCommit, retval(ok): " + TransReturnCode.RET_OK.toString() +
-                         " txid: " + transactionId + " mapsize: " + mapTransactionStates.size());
+                         " txId: " + transactionId + " mapsize: " + mapTransactionStates.size());
 
        return TransReturnCode.RET_OK.getShort();
    }
 
-   public short completeRequest(long transactionId) throws IOException, CommitUnsuccessfulException {
-     if (LOG.isDebugEnabled()) LOG.debug("Enter completeRequest, txid: " + transactionId);
+    public short completeRequest(long transactionId)
+	throws IOException, CommitUnsuccessfulException 
+    {
+     if (LOG.isDebugEnabled()) LOG.debug("Enter completeRequest" 
+					 + ", txId: " + transactionId
+					 );
      TransactionState ts = mapTransactionStates.get(transactionId);
 
-     if(ts == null) {
-          LOG.error("Returning from HBaseTxClient:completeRequest, (null tx) retval: " + TransReturnCode.RET_NOTX.toString() + " txid: " + transactionId);
-          return TransReturnCode.RET_NOTX.getShort();
-       }
-  
+     if (ts == null) {
+	     LOG.error("Returning from completeRequest, (null tx) retval: " 
+		       + TransReturnCode.RET_NOTX.toString() 
+		       + ", txId: " + transactionId
+		       );
+	 return TransReturnCode.RET_NOTX.getShort();
+     }
+
        boolean loopBack = false;
        do {
           try {
-             if (LOG.isTraceEnabled()) LOG.trace("TEMP completeRequest Calling CompleteRequest() Txid :" + transactionId);
              loopBack = false;
              ts.completeRequest();
           } catch(InterruptedException ie) {
-              LOG.warn("Interrupting HBaseTxClient:completeRequest but retrying, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: ", ie);
+              LOG.warn("Interrupting completeRequest but retrying, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: ", ie);
               loopBack = true;
           } 
        } while (loopBack);
@@ -681,11 +714,8 @@ public class HBaseTxClient {
        commitErr = doCommit(transactionId);
        if (commitErr != TransReturnCode.RET_OK.getShort()) {
          LOG.error("doCommit for committed transaction " + transactionId + " failed with error " + commitErr);
-         // It is a violation of 2 PC protocol to try to abort the transaction after prepare
+         // It is a violation of 2 PC protocol to try to abort the transaction after commit write
          return commitErr;
-//         abortErr = abortTransaction(transactionId);
-//         if (LOG.isDebugEnabled()) LOG.debug("tryCommit commit failed and was aborted. Commit error " +
-//                   commitErr + ", Abort error " + abortErr);
        }
 
        if (LOG.isTraceEnabled()) LOG.trace("TEMP tryCommit Calling CompleteRequest() Txid :" + transactionId);
@@ -704,7 +734,7 @@ public class HBaseTxClient {
        mapTransactionStates.remove(transactionId);
     }
 
-    if (LOG.isDebugEnabled()) LOG.debug("Exit completeRequest txid: " + transactionId + " mapsize: " + mapTransactionStates.size());
+    if (LOG.isDebugEnabled()) LOG.debug("Exit completeRequest txId: " + transactionId + " mapsize: " + mapTransactionStates.size());
     return TransReturnCode.RET_OK.getShort();
   }
 
@@ -713,11 +743,11 @@ public class HBaseTxClient {
       TransactionState ts;
       HTableDescriptor htdesc = null;
 
-      if (LOG.isTraceEnabled()) LOG.trace("Enter callCreateTable, txid: [" + transactionId + "],  htbldesc bytearray: " + pv_htbldesc + "desc in hex: " + Hex.encodeHexString(pv_htbldesc));
+      if (LOG.isTraceEnabled()) LOG.trace("Enter callCreateTable, txId: [" + transactionId + "],  htbldesc bytearray: " + pv_htbldesc + "desc in hex: " + Hex.encodeHexString(pv_htbldesc));
 
       ts = mapTransactionStates.get(transactionId);
       if(ts == null) {
-         LOG.error("Returning from HBaseTxClient:callCreateTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txid: " + transactionId);
+         LOG.error("Returning from callCreateTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txId: " + transactionId);
          return TransReturnCode.RET_NOTX.getShort();
       }
       try {
@@ -745,11 +775,11 @@ public class HBaseTxClient {
       TransactionState ts;
       String strTblName = new String(pv_tblname, "UTF-8");
 
-      if (LOG.isTraceEnabled()) LOG.trace("Enter callAlterTable, txid: [" + transactionId + "],  tableName: " + strTblName);
+      if (LOG.isTraceEnabled()) LOG.trace("Enter callAlterTable, txId: [" + transactionId + "],  tableName: " + strTblName);
 
       ts = mapTransactionStates.get(transactionId);
       if(ts == null) {
-         LOG.error("Returning from HBaseTxClient:callAlterTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txid: " + transactionId);
+         LOG.error("Returning from callAlterTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txId: " + transactionId);
          return TransReturnCode.RET_NOTX.getShort();
       }
 
@@ -762,11 +792,11 @@ public class HBaseTxClient {
       TransactionState ts;
       String strTblName = new String(pv_tblname, "UTF-8");
 
-      if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterTruncateOnAbort, txid: [" + transactionId + "],  tablename: " + strTblName);
+      if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterTruncateOnAbort, txId: [" + transactionId + "],  tablename: " + strTblName);
 
       ts = mapTransactionStates.get(transactionId);
       if(ts == null) {
-         LOG.error("Returning from HBaseTxClient:callRegisterTruncateOnAbort, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txid: " + transactionId);
+         LOG.error("Returning from callRegisterTruncateOnAbort, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txId: " + transactionId);
          return TransReturnCode.RET_NOTX.getShort();
       }
 
@@ -779,11 +809,11 @@ public class HBaseTxClient {
       TransactionState ts;
       String strTblName = new String(pv_tblname, "UTF-8");
 
-      if (LOG.isTraceEnabled()) LOG.trace("Enter callDropTable, txid: [" + transactionId + "],  tablename: " + strTblName);
+      if (LOG.isTraceEnabled()) LOG.trace("Enter callDropTable, txId: [" + transactionId + "],  tablename: " + strTblName);
 
       ts = mapTransactionStates.get(transactionId);
       if(ts == null) {
-         LOG.error("Returning from HBaseTxClient:callDropTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txid: " + transactionId);
+         LOG.error("Returning from callDropTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txId: " + transactionId);
          return TransReturnCode.RET_NOTX.getShort();
       }
 
@@ -798,8 +828,14 @@ public class HBaseTxClient {
                                     long pv_startcode,
                                     byte[] pv_regionInfo) throws IOException {
        String hostname    = new String(pv_hostname);
-       if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterRegion, txid: [" + transactionId + "], startId: " + startId + ", port: "
-           + pv_port + ", hostname: " + hostname + ", reg info len: " + pv_regionInfo.length + " " + new String(pv_regionInfo, "UTF-8"));
+       if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterRegion, "
+					   + "txId: [" + transactionId + "]" 
+					   + ", startId: " + startId 
+					   + ", port: " + pv_port 
+					   + ", hostname: " + hostname 
+					   + ", startcode: " + pv_startcode 
+					   + ", reg info len: " + pv_regionInfo.length 
+					   + " " + new String(pv_regionInfo, "UTF-8"));
 
        HRegionInfo lv_regionInfo;
        try {
@@ -818,7 +854,7 @@ public class HBaseTxClient {
 
        TransactionState ts = mapTransactionStates.get(transactionId);
        if(ts == null) {
-          if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion transactionId (" + transactionId +
+          if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion transactionId (" + transactionId +
                    ") not found in mapTransactionStates of size: " + mapTransactionStates.size());
           try {
              ts = trxManager.beginTransaction(transactionId);
@@ -831,38 +867,35 @@ public class HBaseTxClient {
              if (ts2 != null) {
                 // Some other thread added the transaction while we were creating one.  It's already in the
                 // map, so we can use the existing one.
-                if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion, found TransactionState object while creating a new one " + ts2);
+                if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion, found TransactionState object while creating a new one " + ts2);
                 ts = ts2;
              }
              else {
                 ts.setStartId(startId);
-                if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion new transactionState created: " + ts );
+                if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion new transactionState created: " + ts );
              }
           }// end synchronized
        }
        else {
-          if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion existing transactionState found: " + ts );
+          if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion existing transactionState found: " + ts );
           if (ts.getStartId() == -1) {
             ts.setStartId(startId);
-            if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion reset startId for transactionState: " + ts );
+            if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion reset startId for transactionState: " + ts );
           }
        }
 
        try {
           trxManager.registerRegion(ts, regionLocation);
        } catch (IOException e) {
-          LOG.error("HBaseTxClient:callRegisterRegion exception in registerRegion call, txid: " + transactionId +
-            " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " , e);
+          LOG.error("callRegisterRegion exception in registerRegion call, txId: " + transactionId +
+            " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " + e);
           return TransReturnCode.RET_EXCEPTION.getShort();
        }
 
        if (LOG.isDebugEnabled()) LOG.debug("RegisterRegion adding table name " + regionTableName);
        ts.addTableName(regionTableName);
 
-       // Removing unnecessary put back into the map
-       // mapTransactionStates.put(ts.getTransactionId(), ts);
-
-       if (LOG.isTraceEnabled()) LOG.trace("Exit callRegisterRegion, txid: [" + transactionId + "] with mapsize: "
+       if (LOG.isTraceEnabled()) LOG.trace("Exit callRegisterRegion, txId: [" + transactionId + "] with mapsize: "
                   + mapTransactionStates.size());
        return TransReturnCode.RET_OK.getShort();
    }
@@ -893,7 +926,7 @@ public class HBaseTxClient {
       if (LOG.isTraceEnabled()) LOG.trace("Enter addControlPoint");
       long result = 0L;
       if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient calling tLog.addControlPoint with mapsize " + mapTransactionStates.size());
-      result = tLog.addControlPoint(mapTransactionStates);
+      result = tLog.addControlPoint(myClusterId, mapTransactionStates, true);
       Long lowestStartId = Long.MAX_VALUE;
       for(ConcurrentHashMap.Entry<Long, TransactionState> entry : mapTransactionStates.entrySet()){
           TransactionState value;
@@ -1019,6 +1052,8 @@ public class HBaseTxClient {
              private boolean useForgotten;
              private boolean forceForgotten;
              private boolean useTlog;
+             private boolean leadtm;
+             private boolean takeover;
              HBaseTxClient hbtx;
              private static int envSleepTimeInt;
              private boolean ddlOnlyRecoveryCheck = true;
@@ -1038,36 +1073,43 @@ public class HBaseTxClient {
                                HBaseTxClient hbtx,
                                boolean useForgotten,
                                boolean forceForgotten,
-                               boolean useTlog) {
+                               boolean useTlog,
+                               boolean leadtm,
+                               boolean takeover) {
              this(audit, zookeeper, txnManager);
              this.hbtx = hbtx;
              this.useForgotten = useForgotten;
              this.forceForgotten = forceForgotten;
              this.useTlog= useTlog;
-         }
+             this.leadtm = leadtm;
+             this.takeover = takeover;
+             if(LOG.isDebugEnabled()) LOG.debug("Traf Recovery Thread starts for DTM " + tmID +
+                             " LDTM " + leadtm + " Takeover " + takeover);
+
+            }
              /**
               *
               * @param audit
               * @param zookeeper
               * @param txnManager
               */
-             public RecoveryThread(TmAuditTlog audit,
+            public RecoveryThread(TmAuditTlog audit,
                                    HBaseTmZK zookeeper,
                                    TransactionManager txnManager)
-             {
+            {
                           this.audit = audit;
                           this.zookeeper = zookeeper;
                           this.txnManager = txnManager;
                           this.inDoubtList = new HashSet<Long> ();
                           this.tmID = zookeeper.getTMID();
                           this.sleepTimeInt = envSleepTimeInt;
-             }
+            }
 
-             public void stopThread() {
+            public void stopThread() {
                  this.continueThread = false;
-             }
+            }
 
-             private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws IOException{
+            private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws IOException{
                  HRegionInfo regionInfoLoc; // = new HRegionInfo();
                  final byte [] delimiter = ",".getBytes();
                  String[] result = hostnamePort.split(new String(delimiter), 3);
@@ -1141,10 +1183,10 @@ public class HBaseTxClient {
                             if(transactionStates != null)
                                 recoverTransactions(transactionStates);
 
-                        }
+                        } // region not null
                         else {
                             if (recoveryIterations > 0) {
-                                if(LOG.isInfoEnabled()) LOG.info("Recovery completed for TM" + tmID);
+                                if(LOG.isDebugEnabled()) LOG.debug("Recovery completed for TM" + tmID);
                             }
                             recoveryIterations = -1;
                         }
@@ -1156,7 +1198,7 @@ public class HBaseTxClient {
                             }
                             retryCount = 0;
                         } catch (InterruptedException e) {
-                            LOG.error("Error in recoveryThread: " + e);
+                            LOG.error("Error in recoveryThread: ", e);
                         }
                     } catch (IOException e) {
                         int possibleRetries = 4;
@@ -1351,6 +1393,7 @@ public class HBaseTxClient {
         
         for (Map.Entry<Long, TransactionState> tsEntry : transactionStates.entrySet()) {
             int isTransactionStillAlive = 0;
+           TransactionState ts1 = null;
             TransactionState ts = tsEntry.getValue();
             Long txID = ts.getTransactionId();
             // TransactionState ts = new TransactionState(txID);
@@ -1379,9 +1422,16 @@ public class HBaseTxClient {
                         LOG.debug("TRAF RCOV THREAD:Redriving commit for " + txID + " number of regions " + ts.getParticipatingRegions().size() +
                                 " and tolerating UnknownTransactionExceptions");
                     txnManager.doCommit(ts, true /*ignore UnknownTransactionException*/);
-                    if(useTlog && useForgotten) {
-                        long nextAsn = tLog.getNextAuditSeqNum((int)TransactionState.getNodeId(txID));
-                        tLog.putSingleRecord(txID, ts.getCommitId(), "FORGOTTEN", null, forceForgotten, nextAsn);
+                    if(useTlog) {
+                        long nextAsn = tLog.getNextAuditSeqNum(myClusterId, (int)TransactionState.getNodeId(txID));
+                        tLog.putSingleRecord(txID,
+                                             ts.getStartId(),
+                                             ts.getCommitId(),
+                                             TransState.STATE_RECOVERY_COMMITTED.toString(),
+                                             ts.getParticipatingRegions(),
+                                             false,
+                                             forceForgotten,
+                                             nextAsn);
                     }
                 } else if (ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
                     if (LOG.isDebugEnabled())
@@ -1436,7 +1486,7 @@ public class HBaseTxClient {
       TransactionState value;
       int tnum = 0; // Transaction number
 
-      if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient::callRequestRegionInfo:: start\n");
+      if (LOG.isTraceEnabled()) LOG.trace(":callRequestRegionInfo:: start\n");
 
       HashMapArray hm = new HashMapArray();
 
@@ -1448,40 +1498,64 @@ public class HBaseTxClient {
           TransactionState ts = mapTransactionStates.get(id);
           final Set<TransactionRegionLocation> regions = ts.getParticipatingRegions();
 
+          // TableName
           Iterator<TransactionRegionLocation> it = regions.iterator();
-          
-          while(it.hasNext()) {
-              TransactionRegionLocation trl = it.next();
-              tablename = trl.getRegionInfo().getTable().getNameAsString();
-              if(tablename.contains("TRAFODION._MD_."))
-                 continue;
-              encoded_region_name = trl.getRegionInfo().getEncodedName();
-              region_name = trl.getRegionInfo().getRegionNameAsString();
-              boolean is_offline_bool = trl.getRegionInfo().isOffline();
-              is_offline = String.valueOf(is_offline_bool);
-              region_id = String.valueOf(trl.getRegionInfo().getRegionId());
-              thn = String.valueOf(trl.getHostname());
-              hostname = thn.substring(0, thn.length()-1);
-              port = String.valueOf(trl.getPort());              
-              startkey = Bytes.equals(trl.getRegionInfo().getStartKey(), HConstants.EMPTY_START_ROW) ?
-                            "INFINITE" : Hex.encodeHexString(trl.getRegionInfo().getStartKey()); 
-              endkey   = Bytes.equals(trl.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) ?
-                            "INFINITE" : Hex.encodeHexString(trl.getRegionInfo().getEndKey()); 
-
-              StringBuilder inputStr = new StringBuilder();
-              inputStr.append(tablename).append(";");
-              inputStr.append(encoded_region_name).append(";");
-              inputStr.append(region_name).append(";");
-              inputStr.append(region_id).append(";");
-              inputStr.append(hostname).append(";");
-              inputStr.append(port).append(";");
-              inputStr.append(startkey).append(";");
-              inputStr.append(endkey);
-              hm.appendRegionInfo(id,  inputStr.toString());
+          tablename = it.next().getRegionInfo().getTable().getNameAsString();
+          while(it.hasNext()){
+              tablename = tablename + ";" + it.next().getRegionInfo().getTable().getNameAsString();
+          }
+          hm.addElement(tnum, "TableName", tablename);
 
-         }
-         tnum = tnum + 1;
-      }
+          // Encoded Region Name
+          Iterator<TransactionRegionLocation> it2 = regions.iterator();
+          encoded_region_name = it2.next().getRegionInfo().getEncodedName();
+          while(it2.hasNext()){
+              encoded_region_name = encoded_region_name + ";" + it2.next().getRegionInfo().getTable().getNameAsString();
+          }
+          hm.addElement(tnum, "EncodedRegionName", encoded_region_name);
+
+          // Region Name
+          Iterator<TransactionRegionLocation> it3 = regions.iterator();
+          region_name = it3.next().getRegionInfo().getRegionNameAsString();
+          while(it3.hasNext()){
+              region_name = region_name + ";" + it3.next().getRegionInfo().getTable().getNameAsString();
+          }
+          hm.addElement(tnum, "RegionName", region_name);
+
+          // Region Offline
+          Iterator<TransactionRegionLocation> it4 = regions.iterator();
+          boolean is_offline_bool = it4.next().getRegionInfo().isOffline();
+          is_offline = String.valueOf(is_offline_bool);
+          hm.addElement(tnum, "RegionOffline", is_offline);
+
+          // Region ID
+          Iterator<TransactionRegionLocation> it5 = regions.iterator();
+          region_id = String.valueOf(it5.next().getRegionInfo().getRegionId());
+          while(it5.hasNext()){
+              region_id = region_id + ";" + it5.next().getRegionInfo().getRegionId();
+          }
+          hm.addElement(tnum, "RegionID", region_id);
+
+          // Hostname
+          Iterator<TransactionRegionLocation> it6 = regions.iterator();
+          thn = String.valueOf(it6.next().getHostname());
+          hostname = thn.substring(0, thn.length()-1);
+          while(it6.hasNext()){
+              thn = String.valueOf(it6.next().getHostname());
+              hostname = hostname + ";" + thn.substring(0, thn.length()-1);
+          }
+          hm.addElement(tnum, "Hostname", hostname);
+
+          // Port
+          Iterator<TransactionRegionLocation> it7 = regions.iterator();
+          port = String.valueOf(it7.next().getPort());
+          while(it7.hasNext()){
+              port = port + ";" + String.valueOf(it7.next().getPort());
+          }
+          hm.addElement(tnum, "Port", port);
+
+          tnum = tnum + 1;
+        }
       if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient::callRequestRegionInfo:: end size: " + hm.getSize());
       return hm;
    }