You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2015/10/14 03:14:45 UTC

[02/11] incubator-trafodion git commit: TRAFODION-1521 Build Trafodion without having HBase installed

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java
new file mode 100644
index 0000000..27411ec
--- /dev/null
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTmZK.java
@@ -0,0 +1,247 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.dtm;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.ServerName;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.transactional.TransactionRegionLocation;
+
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Zookeeper client-side communication class for the DTM
+ */
+public class HBaseTmZK implements Abortable{
+	private final String baseNode = "/hbase/Trafodion/recovery/TM";
+	static final Log LOG = LogFactory.getLog(HBaseTmZK.class);
+	
+	ZooKeeperWatcher zooKeeper;
+	Set<String>     nodeList;	
+	String zkNode;
+	short dtmID;
+	
+	/**
+	 * @param conf
+	 * @throws Exception
+	 */
+	public HBaseTmZK(final Configuration conf) throws Exception {
+                if (LOG.isTraceEnabled()) LOG.trace("HBaseTmZK(conf) -- ENTRY");
+		this.dtmID = 0;
+		this.zkNode = baseNode + "0";
+		this.zooKeeper = new ZooKeeperWatcher(conf, "TM Recovery", this, true);
+	}
+	
+	/**
+	 * @param conf
+	 * @param dtmID
+	 * @throws Exception
+	 */
+	public HBaseTmZK(final Configuration conf, final short dtmID) throws Exception {
+        if (LOG.isTraceEnabled()) LOG.trace("HBaseTmZK(conf, dtmID) -- ENTRY");
+		this.dtmID = dtmID;
+		this.zkNode = baseNode + String.format("%d", dtmID);
+		this.zooKeeper = new ZooKeeperWatcher(conf, "TM Recovery", this, true);
+	}
+
+	/**
+	 * @param znode
+	 * @return
+	 * @throws KeeperException
+	 */
+	private byte [] checkData (String znode) throws KeeperException, InterruptedException {
+		return ZKUtil.getData(zooKeeper, znode);
+	}
+	
+	/**
+	 * @return
+	 */
+	public short getTMID() {
+		return dtmID;
+	}
+	
+	
+	/**
+	 * @return
+	 * @throws KeeperException
+	 */
+	private List<String> getChildren() throws KeeperException {
+		return ZKUtil.listChildrenNoWatch(zooKeeper, zkNode);
+	}
+
+	/**
+	 * @return
+	 * @throws KeeperException
+	 */
+	public Map<String,byte []> checkForRecovery() throws InterruptedException, KeeperException {
+                // if (LOG.isTraceEnabled()) LOG.trace("checkForRecovery -- ENTRY");
+		if(ZKUtil.nodeHasChildren(zooKeeper, zkNode)) {
+			List<String> nodeChildren = new ArrayList<String>();
+			Map<String, byte []> nodeDataMap = new HashMap<String, byte []>();
+			nodeChildren = getChildren();
+
+			for(String node : nodeChildren) {
+
+                        if (LOG.isTraceEnabled()) LOG.trace("checkForRecovery -- found node: '" + node + "'");
+                        byte [] nodeData = checkData(zkNode +"/" + node);
+                        if (LOG.isTraceEnabled()) LOG.trace("checkForRecovery -- found node: " + node + " node data " + nodeData.toString());
+				nodeDataMap.put(node, nodeData);
+			}
+                        if (LOG.isTraceEnabled()) LOG.trace("checkForRecovery -- EXIT returning " + nodeDataMap.size() + " regions");
+                                return nodeDataMap;
+                }
+                else {
+                        // if (LOG.isTraceEnabled()) LOG.trace(zkNode + " is currently not present.");
+                        // if (LOG.isTraceEnabled()) LOG.trace("checkForRecovery -- EXIT -- node not present");
+                        return null;
+                }
+	}
+
+	/**
+	 * @param toDelete
+	 * @throws KeeperException
+	 */
+	public void deleteRegionEntry(Map.Entry<String,byte[]> toDelete ) throws KeeperException {
+           LOG.info("deleteRegionEntry -- ENTRY -- key: " + toDelete.getKey() + " value: " + new String(toDelete.getValue()));
+           ZKUtil.deleteNodeFailSilent(zooKeeper, zkNode + "/" + toDelete.getKey());
+           LOG.info("deleteRegionEntry -- EXIT ");
+        }
+
+	/**
+	 * @param node
+	 * @param hostName
+	 * @param portNumber
+	 * @param encodedName
+	 * @param data
+	 * @throws IOException
+	 */
+        public void createRecoveryzNode(String hostName, int portNumber, String encodedName, byte [] data) throws IOException {
+           LOG.info("HBaseTmZK:createRecoveryzNode: hostName: " + hostName + " port: " + portNumber +
+                     " encodedName: " + encodedName + " data: " + new String(data));
+           // default zNodePath for recovery
+           String zNodeKey = hostName + "," + portNumber + "," + encodedName;
+
+           LOG.info("HBaseTmZK:createRecoveryzNode: ZKW Post region recovery znode" + this.dtmID + " zNode Path " + zkNode);
+           // create zookeeper recovery zNode, call ZK ...
+           try {
+              if (ZKUtil.checkExists(zooKeeper, zkNode) == -1) {
+                 // create parent nodename
+                 LOG.info("HBaseTmZK:createRecoveryzNode:: ZKW create parent zNodes " + zkNode);
+                 ZKUtil.createWithParents(zooKeeper, zkNode);
+              }
+              ZKUtil.createAndFailSilent(zooKeeper, zkNode + "/" + zNodeKey, data);
+           } catch (KeeperException e) {
+              throw new IOException("HBaseTmZK:createRecoveryzNode: ZKW Unable to create recovery zNode: " + zkNode + " , throwing IOException " + e);
+           }
+        }
+        /**
+         ** @param data
+         ** #throws IOException
+         **/
+        public void createGCzNode(byte [] data) throws IOException {
+            String zNodeGCPath = "/hbase/Trafodion/GC";
+            try {
+                if (ZKUtil.checkExists(zooKeeper, zNodeGCPath) == -1) {
+                   if (LOG.isTraceEnabled()) LOG.trace("Trafodion table data clean up no znode path created " + zNodeGCPath);
+                    ZKUtil.createWithParents(zooKeeper, zNodeGCPath);
+                }
+                String zNodeKey = dtmID+"";
+                ZKUtil.createSetData(zooKeeper, zNodeGCPath + "/" + zNodeKey, data);
+            } catch (KeeperException e) {
+                throw new IOException("HBaseTmZK:createGCzNode: ZKW Unable to create GC zNode: " + zNodeGCPath +"  , throwing IOException " + e);
+            }
+        }
+
+	/**
+	 * @param node
+	 * @param recovTable
+	 * @throws IOException
+	 */
+        public void postAllRegionEntries(HTable recovTable) throws IOException {
+           LOG.info("HBaseTmZK:postAllRegionEntries: recovTable: " + recovTable );
+           NavigableMap<HRegionInfo, ServerName> regionMap = recovTable.getRegionLocations();
+           Iterator<Map.Entry<HRegionInfo, ServerName>> it =  regionMap.entrySet().iterator();
+           while(it.hasNext()) { // iterate entries.
+              NavigableMap.Entry<HRegionInfo, ServerName> pairs = it.next();
+              HRegionInfo region = pairs.getKey();
+              LOG.info("postAllRegionEntries: region: " + region.getRegionNameAsString());
+              ServerName serverValue = regionMap.get(region);
+              String hostAndPort = new String(serverValue.getHostAndPort());
+              StringTokenizer tok = new StringTokenizer(hostAndPort, ":");
+              String hostName = new String(tok.nextElement().toString());
+              int portNumber = Integer.parseInt(tok.nextElement().toString());
+              byte [] lv_byte_region_info = region.toByteArray();
+              try{
+                 LOG.info("Calling createRecoveryzNode for encoded region: " + region.getEncodedName());
+                 createRecoveryzNode(hostName, portNumber, region.getEncodedName(), lv_byte_region_info);
+              }
+              catch (Exception e2){
+                 LOG.error("postAllRegionEntries exception in createRecoveryzNode " + region.getTable().getNameAsString() +
+                           " exception: " + e2);
+              }
+           }// while
+        }
+	
+	/* (non-Javadoc)
+	 * @see org.apache.hadoop.hbase.Abortable#abort(java.lang.String, java.lang.Throwable)
+	 */
+	@Override
+	public void abort(String arg0, Throwable arg1) {
+		// TODO Auto-generated method stub
+		
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.hadoop.hbase.Abortable#isAborted()
+	 */
+	@Override
+	public boolean isAborted() {
+		// TODO Auto-generated method stub
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
new file mode 100644
index 0000000..22513cd
--- /dev/null
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -0,0 +1,1375 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+package org.trafodion.dtm;
+
+import java.util.Collections;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.Logger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.transactional.UnsuccessfulDDLException;
+import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
+import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
+import org.apache.hadoop.hbase.client.transactional.TransactionRegionLocation;
+import org.apache.hadoop.hbase.client.transactional.TransState;
+import org.apache.hadoop.hbase.client.transactional.TransReturnCode;
+import org.apache.hadoop.hbase.client.transactional.TransactionMap;
+import org.apache.hadoop.hbase.client.transactional.TransactionalReturn;
+import org.apache.hadoop.hbase.client.transactional.TmDDL;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.trafodion.dtm.HBaseTmZK;
+import org.trafodion.dtm.TmAuditTlog;
+
+import org.apache.hadoop.hbase.regionserver.transactional.IdTm;
+import org.apache.hadoop.hbase.regionserver.transactional.IdTmException;
+import org.apache.hadoop.hbase.regionserver.transactional.IdTmId;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HBaseTxClient {
+
+   static final Log LOG = LogFactory.getLog(HBaseTxClient.class);
+   private static TmAuditTlog tLog;
+   private static HBaseTmZK tmZK;
+   private static RecoveryThread recovThread;
+   private static TmDDL tmDDL;
+   private short dtmID;
+   private int stallWhere;
+   private IdTm idServer;
+   private static final int ID_TM_SERVER_TIMEOUT = 1000;
+
+   public enum AlgorithmType{
+     MVCC, SSCC
+   }
+   public AlgorithmType TRANSACTION_ALGORITHM;
+
+   boolean useTlog;
+   boolean useForgotten;
+   boolean forceForgotten;
+   boolean useRecovThread;
+   boolean useDDLTrans;
+
+   private static Configuration config;
+   TransactionManager trxManager;
+   static Map<Long, TransactionState> mapTransactionStates = TransactionMap.getInstance();
+   Map<Integer, RecoveryThread> mapRecoveryThreads = new HashMap<Integer, org.trafodion.dtm.HBaseTxClient.RecoveryThread>();
+   static final Object mapLock = new Object();
+
+   void setupLog4j() {
+        //System.out.println("In setupLog4J");
+        System.setProperty("trafodion.root", System.getenv("MY_SQROOT"));
+        String confFile = System.getenv("MY_SQROOT")
+            + "/conf/log4j.dtm.config";
+        PropertyConfigurator.configure(confFile);
+    }
+
+   public boolean init(String hBasePath, String zkServers, String zkPort) throws Exception {
+      //System.out.println("In init - hbp");
+      setupLog4j();
+      if (LOG.isDebugEnabled()) LOG.debug("Enter init, hBasePath:" + hBasePath);
+      if (LOG.isTraceEnabled()) LOG.trace("mapTransactionStates " + mapTransactionStates + " entries " + mapTransactionStates.size());
+      config = HBaseConfiguration.create();
+
+      config.set("hbase.zookeeper.quorum", zkServers);
+      config.set("hbase.zookeeper.property.clientPort",zkPort);
+      config.set("hbase.rootdir", hBasePath);
+      config.set("dtmid", "0");
+      this.dtmID = 0;
+      this.useRecovThread = false;
+      this.stallWhere = 0;
+
+      useForgotten = true;
+      try {
+         String useAuditRecords = System.getenv("TM_ENABLE_FORGOTTEN_RECORDS");
+         if (useAuditRecords != null) {
+            useForgotten = (Integer.parseInt(useAuditRecords) != 0);
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_FORGOTTEN_RECORDS is not in ms.env");
+      }
+      LOG.info("useForgotten is " + useForgotten);
+
+      forceForgotten = false;
+      try {
+         String forgottenForce = System.getenv("TM_TLOG_FORCE_FORGOTTEN");
+         if (forgottenForce != null){
+            forceForgotten = (Integer.parseInt(forgottenForce) != 0);
+            if (LOG.isDebugEnabled()) LOG.debug("forgottenForce != null");
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_FORCE_FORGOTTEN is not in ms.env");
+      }
+      LOG.info("forceForgotten is " + forceForgotten);
+
+      useTlog = false;
+      useRecovThread = false;
+      try {
+         String useAudit = System.getenv("TM_ENABLE_TLOG_WRITES");
+         if (useAudit != null) {
+            useTlog = useRecovThread = (Integer.parseInt(useAudit) != 0);
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_TLOG_WRITES is not in ms.env");
+      }
+
+      if (useTlog) {
+         try {
+            tLog = new TmAuditTlog(config);
+         } catch (Exception e ){
+            LOG.error("Unable to create TmAuditTlog, throwing exception");
+            throw new RuntimeException(e);
+         }
+      }
+      try {
+        trxManager = TransactionManager.getInstance(config);
+      } catch (IOException e ){
+          LOG.error("Unable to create TransactionManager, throwing exception");
+          throw new RuntimeException(e);
+      }
+
+      if (useRecovThread) {
+         if (LOG.isDebugEnabled()) LOG.debug("Starting recovery thread for tm ID: " + dtmID);
+          try {
+              tmZK = new HBaseTmZK(config, dtmID);
+          }catch (IOException e ){
+              LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception");
+              throw new RuntimeException(e);
+          }
+          recovThread = new RecoveryThread(tLog, tmZK, trxManager);
+          recovThread.start();
+      }
+      if (LOG.isDebugEnabled()) LOG.debug("Exit init(String, String, String)");
+      return true;
+   }
+
+   public boolean init(short dtmid) throws Exception {
+      //System.out.println("In init - dtmId" + dtmid);
+
+      setupLog4j();
+      if (LOG.isDebugEnabled()) LOG.debug("Enter init(" + dtmid + ")");
+      config = HBaseConfiguration.create();
+      config.set("hbase.hregion.impl", "org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion");
+      config.set("hbase.hlog.splitter.impl", "org.apache.hadoop.hbase.regionserver.transactional.THLogSplitter");
+      config.set("dtmid", String.valueOf(dtmid));
+      config.set("CONTROL_POINT_TABLE_NAME", "TRAFODION._DTM_.TLOG" + String.valueOf(dtmid) + "_CONTROL_POINT");
+      config.set("TLOG_TABLE_NAME", "TRAFODION._DTM_.TLOG" + String.valueOf(dtmid));
+
+      this.dtmID = dtmid;
+      this.useRecovThread = false;
+      this.stallWhere = 0;
+      this.useDDLTrans = false;
+
+      String useSSCC = System.getenv("TM_USE_SSCC");
+      TRANSACTION_ALGORITHM = AlgorithmType.MVCC;
+      if (useSSCC != null)
+         TRANSACTION_ALGORITHM = (Integer.parseInt(useSSCC) == 1) ? AlgorithmType.SSCC :AlgorithmType.MVCC ;
+
+      try {
+         idServer = new IdTm(false);
+      }
+      catch (Exception e){
+         LOG.error("Exception creating new IdTm: " + e);
+      }
+
+      try {
+         String useDDLTransactions = System.getenv("TM_ENABLE_DDL_TRANS");
+         if (useDDLTransactions != null) {
+             useDDLTrans = (Integer.parseInt(useDDLTransactions) != 0);
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_DDL_TRANS is not in ms.env");
+      }
+
+      if(useDDLTrans){
+         try {
+            tmDDL = new TmDDL(config);
+         }
+         catch (Exception e) {
+            LOG.error("Unable to create TmDDL, throwing exception " + e);
+            e.printStackTrace();
+            throw new RuntimeException(e);
+         }
+      }
+
+      useForgotten = true;
+      try {
+         String useAuditRecords = System.getenv("TM_ENABLE_FORGOTTEN_RECORDS");
+         if (useAuditRecords != null) {
+            useForgotten = (Integer.parseInt(useAuditRecords) != 0);
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_FORGOTTEN_RECORDS is not in ms.env");
+      }
+      LOG.info("useForgotten is " + useForgotten);
+
+      forceForgotten = false;
+      try {
+         String forgottenForce = System.getenv("TM_TLOG_FORCE_FORGOTTEN");
+         if (forgottenForce != null){
+            forceForgotten = (Integer.parseInt(forgottenForce) != 0);
+            if (LOG.isDebugEnabled()) LOG.debug("forgottenForce != null");
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_FORCE_FORGOTTEN is not in ms.env");
+      }
+      LOG.info("forceForgotten is " + forceForgotten);
+
+      useTlog = false;
+      useRecovThread = false;
+      try {
+         String useAudit = System.getenv("TM_ENABLE_TLOG_WRITES");
+         if (useAudit != null){
+            useTlog = useRecovThread = (Integer.parseInt(useAudit) != 0);
+         }
+      }
+      catch (Exception e) {
+         if (LOG.isDebugEnabled()) LOG.debug("TM_ENABLE_TLOG_WRITES is not in ms.env");
+      }
+      if (useTlog) {
+         try {
+            tLog = new TmAuditTlog(config);
+         } catch (Exception e ){
+            LOG.error("Unable to create TmAuditTlog, throwing exception " + e);
+            e.printStackTrace();
+            throw new RuntimeException(e);
+         }
+      }
+      try {
+          trxManager = TransactionManager.getInstance(config);
+      } catch (IOException e ){
+            LOG.error("Unable to create TransactionManager, Exception: " + e + "throwing new RuntimeException");
+            throw new RuntimeException(e);
+      }
+
+      if(useDDLTrans)
+          trxManager.init(tmDDL);
+
+      if (useRecovThread) {
+         if (LOG.isDebugEnabled()) LOG.debug("Entering recovThread Usage");
+          try {
+              tmZK = new HBaseTmZK(config, dtmID);
+          }catch (IOException e ){
+              LOG.error("Unable to create HBaseTmZK TM-zookeeper class, throwing exception");
+              throw new RuntimeException(e);
+          }
+          recovThread = new RecoveryThread(tLog,
+                                           tmZK,
+                                           trxManager,
+                                           this,
+                                           useForgotten,
+                                           forceForgotten,
+                                           useTlog);
+          recovThread.start();
+      }
+      if (LOG.isTraceEnabled()) LOG.trace("Exit init()");
+      return true;
+   }
+
+   public TmDDL getTmDDL() {
+        return tmDDL;
+   }
+
+   public void nodeDown(int nodeID) throws IOException {
+       if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- ENTRY node ID: " + nodeID);
+
+       RecoveryThread newRecovThread;
+       if(dtmID == nodeID)
+           throw new IOException("Down node ID is the same as current dtmID, Incorrect parameter");
+
+       try {
+           if(mapRecoveryThreads.containsKey(nodeID)) {
+               if(LOG.isDebugEnabled()) LOG.debug("nodeDown called on a node that already has RecoveryThread running node ID: " + nodeID);
+           }
+           else {
+               newRecovThread = new RecoveryThread(tLog,
+                                                   new HBaseTmZK(config, (short) nodeID),
+                                                   trxManager,
+                                                   this,
+                                                   useForgotten,
+                                                   forceForgotten,
+                                                   useTlog);
+               newRecovThread.start();
+               mapRecoveryThreads.put(nodeID, recovThread);
+               if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- mapRecoveryThreads size: " + mapRecoveryThreads.size());
+           }
+       }
+       catch(Exception e) {
+           LOG.error("Unable to create rescue recovery thread for TM" + dtmID);
+       }
+       if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- EXIT node ID: " + nodeID);
+   }
+
+   public void nodeUp(int nodeID) throws IOException {
+       if(LOG.isTraceEnabled()) LOG.trace("nodeUp -- ENTRY node ID: " + nodeID);
+       RecoveryThread rt = mapRecoveryThreads.get(nodeID);
+       if(rt == null) {
+           if(LOG.isWarnEnabled()) LOG.warn("nodeUp called on a node that has RecoveryThread removed already, node ID: " + nodeID);
+           if(LOG.isTraceEnabled()) LOG.trace("nodeUp -- EXIT node ID: " + nodeID);
+           return;
+       }
+       rt.stopThread();
+       try {
+           rt.join();
+       } catch (Exception e) { LOG.warn("Problem while waiting for the recovery thread to stop for node ID: " + nodeID); }
+       mapRecoveryThreads.remove(nodeID);
+       if(LOG.isTraceEnabled()) LOG.trace("nodeUp -- mapRecoveryThreads size: " + mapRecoveryThreads.size());
+       if(LOG.isTraceEnabled()) LOG.trace("nodeUp -- EXIT node ID: " + nodeID);
+   }
+
+   public short stall (int where) {
+      if (LOG.isDebugEnabled()) LOG.debug("Entering stall with parameter " + where);
+      this.stallWhere = where;
+      return TransReturnCode.RET_OK.getShort();
+   }
+
+   public long beginTransaction(final long transactionId) throws Exception {
+
+      if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransaction, txid: " + transactionId);
+      TransactionState tx = trxManager.beginTransaction(transactionId);
+      if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:beginTransaction new transactionState created: " + tx);
+      if(tx == null) {
+         LOG.error("null Transaction State returned by the Transaction Manager, txid: " + transactionId);
+         throw new Exception("TransactionState is null");
+      }
+
+      synchronized(mapLock) {
+         TransactionState tx2 = mapTransactionStates.get(transactionId);
+         if (tx2 != null) {
+            // Some other thread added the transaction while we were creating one.  It's already in the
+            // map, so we can use the existing one.
+            if (LOG.isDebugEnabled()) LOG.debug("HBaseTxClient:beginTransaction, found TransactionState object while creating a new one " + tx2);
+            tx = tx2;
+         }
+         else {
+            if (LOG.isDebugEnabled()) LOG.debug("HBaseTxClient:beginTransaction, adding new TransactionState to map " + tx);
+            mapTransactionStates.put(transactionId, tx);
+         }
+      }
+
+      if (LOG.isDebugEnabled()) LOG.debug("Exit beginTransaction, Transaction State: " + tx + " mapsize: " + mapTransactionStates.size());
+      return transactionId;
+   }
+
+   public short abortTransaction(final long transactionID) throws Exception {
+      if (LOG.isDebugEnabled()) LOG.debug("Enter abortTransaction, txid: " + transactionID);
+      TransactionState ts = mapTransactionStates.get(transactionID);
+
+      if(ts == null) {
+          LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " retval: " + TransReturnCode.RET_NOTX.toString());
+          return TransReturnCode.RET_NOTX.getShort();
+      }
+
+      try {
+         ts.setStatus(TransState.STATE_ABORTED);
+         if (useTlog) {
+            tLog.putSingleRecord(transactionID, -1, "ABORTED", ts.getParticipatingRegions(), false);
+         }
+      } catch(Exception e) {
+         LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION");
+         return TransReturnCode.RET_EXCEPTION.getShort();
+      }
+
+      if ((stallWhere == 1) || (stallWhere == 3)) {
+         LOG.info("Stalling in phase 2 for abortTransaction");
+         Thread.sleep(300000); // Initially set to run every 5 min
+      }
+
+      try {
+         trxManager.abort(ts);
+      } catch(IOException e) {
+          synchronized(mapLock) {
+             mapTransactionStates.remove(transactionID);
+          }
+          LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " retval: EXCEPTION");
+          return TransReturnCode.RET_EXCEPTION.getShort();
+      }
+      catch (UnsuccessfulDDLException ddle) {
+          LOG.error("FATAL DDL Exception from HBaseTxClient:abort, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionID);
+
+          //Reaching here means several attempts to perform the DDL operation has failed in abort phase.
+          //Generally if only DML operation is involved, returning error causes TM to call completeRequest()
+          //which causes a hang(abort work is outstanding forever) due to doAbortX thread holding the
+          //commitSendLock (since doAbortX raised an exception and exited without clearing the commitSendLock count).
+          //In the case of DDL exception, no doAbortX thread is involved and commitSendLock is not held. Hence to mimic
+          //the same hang behaviour, the current worker thread will be put to wait indefinitely for user intervention.
+          //Long Term solution to this behaviour is currently TODO.
+          Object commitDDLLock = new Object();
+          synchronized(commitDDLLock)
+          {
+            commitDDLLock.wait();
+          }
+          return TransReturnCode.RET_EXCEPTION.getShort();
+      }
+      if (useTlog && useForgotten) {
+         if (forceForgotten) {
+            tLog.putSingleRecord(transactionID, -1, "FORGOTTEN", null, true);
+         }
+         else {
+            tLog.putSingleRecord(transactionID, -1, "FORGOTTEN", null, false);
+         }
+      }
+ //     mapTransactionStates.remove(transactionID);
+
+      if (LOG.isTraceEnabled()) LOG.trace("Exit abortTransaction, retval: OK txid: " + transactionID + " mapsize: " + mapTransactionStates.size());
+      return TransReturnCode.RET_OK.getShort();
+   }
+
+   public short prepareCommit(long transactionId) throws Exception {
+     if (LOG.isDebugEnabled()) LOG.debug("Enter prepareCommit, txid: " + transactionId);
+     if (LOG.isTraceEnabled()) LOG.trace("mapTransactionStates " + mapTransactionStates + " entries " + mapTransactionStates.size());
+        TransactionState ts = mapTransactionStates.get(transactionId);
+     if(ts == null) {
+       LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOTX.toString());
+       return TransReturnCode.RET_NOTX.getShort();
+     }
+
+     try {
+        short result = (short) trxManager.prepareCommit(ts);
+        if (LOG.isDebugEnabled()) LOG.debug("prepareCommit, [ " + ts + " ], result " + result + ((result == TransactionalReturn.COMMIT_OK_READ_ONLY)?", Read-Only":""));
+        switch (result) {
+          case TransactionalReturn.COMMIT_OK:
+             if (LOG.isTraceEnabled()) LOG.trace("Exit OK prepareCommit, txid: " + transactionId);
+             return TransReturnCode.RET_OK.getShort();
+          case TransactionalReturn.COMMIT_OK_READ_ONLY:
+             synchronized(mapLock) {
+                mapTransactionStates.remove(transactionId);
+             }
+             if (LOG.isTraceEnabled()) LOG.trace("Exit OK_READ_ONLY prepareCommit, txid: " + transactionId);
+             return TransReturnCode.RET_READONLY.getShort();
+          case TransactionalReturn.COMMIT_UNSUCCESSFUL:
+             LOG.info("Exit RET_EXCEPTION prepareCommit, txid: " + transactionId);
+             return TransReturnCode.RET_EXCEPTION.getShort();
+          case TransactionalReturn.COMMIT_CONFLICT:
+             LOG.info("Exit RET_HASCONFLICT prepareCommit, txid: " + transactionId);
+             return TransReturnCode.RET_HASCONFLICT.getShort();
+          default:
+             LOG.info("Exit default RET_EXCEPTION prepareCommit, txid: " + transactionId);
+             return TransReturnCode.RET_EXCEPTION.getShort();
+        }
+     } catch (IOException e) {
+       LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException");
+       return TransReturnCode.RET_IOEXCEPTION.getShort();
+     } catch (CommitUnsuccessfulException e) {
+       LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException");
+       return TransReturnCode.RET_NOCOMMITEX.getShort();
+     }
+     catch (Exception e) {
+           LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " Exception " + e);
+           return TransReturnCode.RET_NOCOMMITEX.getShort();
+     }
+   }
+
+   public short doCommit(long transactionId) throws Exception {
+       if (LOG.isDebugEnabled()) LOG.debug("Enter doCommit, txid: " + transactionId);
+       TransactionState ts = mapTransactionStates.get(transactionId);
+
+       if(ts == null) {
+      LOG.error("Returning from HBaseTxClient:doCommit, (null tx) retval: " + TransReturnCode.RET_NOTX.toString() + " txid: " + transactionId);
+          return TransReturnCode.RET_NOTX.getShort();
+       }
+
+       // Set the commitId
+       IdTmId commitId = null;
+       if (TRANSACTION_ALGORITHM == AlgorithmType.SSCC) {
+          try {
+             commitId = new IdTmId();
+             if (LOG.isTraceEnabled()) LOG.trace("doCommit getting new commitId");
+             idServer.id(ID_TM_SERVER_TIMEOUT, commitId);
+             if (LOG.isTraceEnabled()) LOG.trace("doCommit idServer.id returned: " + commitId.val);
+          } catch (IdTmException exc) {
+             LOG.error("doCommit: IdTm threw exception " + exc);
+             throw new CommitUnsuccessfulException("doCommit: IdTm threw exception " + exc);
+          }
+       }
+
+       final long commitIdVal = (TRANSACTION_ALGORITHM == AlgorithmType.SSCC) ? commitId.val : -1;
+       if (LOG.isTraceEnabled()) LOG.trace("doCommit setting commitId (" + commitIdVal + ") for tx: " + ts.getTransactionId());
+       ts.setCommitId(commitIdVal);
+
+       try {
+          ts.setStatus(TransState.STATE_COMMITTED);
+          if (useTlog) {
+             tLog.putSingleRecord(transactionId, commitIdVal, "COMMITTED", ts.getParticipatingRegions(), true);
+          }
+       } catch(Exception e) {
+          LOG.error("Returning from HBaseTxClient:doCommit, txid: " + transactionId + " tLog.putRecord: EXCEPTION " + e);
+          return TransReturnCode.RET_EXCEPTION.getShort();
+       }
+
+       if ((stallWhere == 2) || (stallWhere == 3)) {
+          LOG.info("Stalling in phase 2 for doCommit");
+          Thread.sleep(300000); // Initially set to run every 5 min
+       }
+
+       try {
+          trxManager.doCommit(ts);
+       } catch (CommitUnsuccessfulException e) {
+          LOG.error("Returning from HBaseTxClient:doCommit, retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException" + " txid: " + transactionId);
+          return TransReturnCode.RET_EXCEPTION.getShort();
+       }
+       catch (UnsuccessfulDDLException ddle) {
+          LOG.error("FATAL DDL Exception from HBaseTxClient:doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionId);
+
+          //Reaching here means several attempts to perform the DDL operation has failed in commit phase.
+          //Generally if only DML operation is involved, returning error causes TM to call completeRequest()
+          //which causes a hang(commit work is outstanding forever) due to doCommitX thread holding the
+          //commitSendLock (since doCommitX raised an exception and exited without clearing the commitSendLock count).
+          //In the case of DDL exception, no doCommitX thread is involved and commitSendLock is not held. Hence to mimic
+          //the same hang behaviour, the current worker thread will be put to wait indefinitely for user intervention.
+          //Long Term solution to this behaviour is currently TODO.
+          Object commitDDLLock = new Object();
+          synchronized(commitDDLLock)
+          {
+            commitDDLLock.wait();
+          }
+          return TransReturnCode.RET_EXCEPTION.getShort();
+       }
+       if (useTlog && useForgotten) {
+          if (forceForgotten) {
+             tLog.putSingleRecord(transactionId, commitIdVal, "FORGOTTEN", null, true);
+          }
+          else {
+             tLog.putSingleRecord(transactionId, commitIdVal, "FORGOTTEN", null, false);
+          }
+       }
+//       mapTransactionStates.remove(transactionId);
+
+       if (LOG.isTraceEnabled()) LOG.trace("Exit doCommit, retval(ok): " + TransReturnCode.RET_OK.toString() +
+                         " txid: " + transactionId + " mapsize: " + mapTransactionStates.size());
+
+       return TransReturnCode.RET_OK.getShort();
+   }
+
+   public short completeRequest(long transactionId) throws Exception {
+     if (LOG.isDebugEnabled()) LOG.debug("Enter completeRequest, txid: " + transactionId);
+     TransactionState ts = mapTransactionStates.get(transactionId);
+
+     if(ts == null) {
+          LOG.error("Returning from HBaseTxClient:completeRequest, (null tx) retval: " + TransReturnCode.RET_NOTX.toString() + " txid: " + transactionId);
+          return TransReturnCode.RET_NOTX.getShort();
+       }
+
+       try {
+
+       if (LOG.isTraceEnabled()) LOG.trace("TEMP completeRequest Calling CompleteRequest() Txid :" + transactionId);
+
+          ts.completeRequest();
+       } catch(Exception e) {
+          LOG.error("Returning from HBaseTxClient:completeRequest, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: " + e);
+       throw new Exception("Exception during completeRequest, unable to commit.  Exception: " + e);
+       }
+
+     synchronized(mapLock) {
+        mapTransactionStates.remove(transactionId);
+     }
+
+     if (LOG.isDebugEnabled()) LOG.debug("Exit completeRequest txid: " + transactionId + " mapsize: " + mapTransactionStates.size());
+     return TransReturnCode.RET_OK.getShort();
+   }
+
+   public short tryCommit(long transactionId) throws Exception {
+     if (LOG.isDebugEnabled()) LOG.debug("Enter tryCommit, txid: " + transactionId);
+     short err, commitErr, abortErr = TransReturnCode.RET_OK.getShort();
+
+     try {
+       err = prepareCommit(transactionId);
+       if (err != TransReturnCode.RET_OK.getShort()) {
+         if (LOG.isDebugEnabled()) LOG.debug("tryCommit prepare failed with error " + err);
+         return err;
+       }
+       commitErr = doCommit(transactionId);
+       if (commitErr != TransReturnCode.RET_OK.getShort()) {
+         LOG.error("doCommit for committed transaction " + transactionId + " failed with error " + commitErr);
+         // It is a violation of 2 PC protocol to try to abort the transaction after prepare
+         return commitErr;
+//         abortErr = abortTransaction(transactionId);
+//         if (LOG.isDebugEnabled()) LOG.debug("tryCommit commit failed and was aborted. Commit error " +
+//                   commitErr + ", Abort error " + abortErr);
+       }
+
+       if (LOG.isTraceEnabled()) LOG.trace("TEMP tryCommit Calling CompleteRequest() Txid :" + transactionId);
+
+       err = completeRequest(transactionId);
+       if (err != TransReturnCode.RET_OK.getShort()){
+         if (LOG.isDebugEnabled()) LOG.debug("tryCommit completeRequest for transaction " + transactionId + " failed with error " + err);
+       }
+     } catch(Exception e) {
+       mapTransactionStates.remove(transactionId);
+       LOG.error("Returning from HBaseTxClient:tryCommit, ts: EXCEPTION" + " txid: " + transactionId);
+       throw new Exception("Exception " + e + "during tryCommit, unable to commit.");
+    }
+
+    synchronized(mapLock) {
+       mapTransactionStates.remove(transactionId);
+    }
+
+    if (LOG.isDebugEnabled()) LOG.debug("Exit completeRequest txid: " + transactionId + " mapsize: " + mapTransactionStates.size());
+    return TransReturnCode.RET_OK.getShort();
+  }
+
+   public short callCreateTable(long transactionId, byte[] pv_htbldesc, Object[]  beginEndKeys) throws Exception
+   {
+      TransactionState ts;
+      HTableDescriptor htdesc;
+
+      if (LOG.isTraceEnabled()) LOG.trace("Enter callCreateTable, txid: [" + transactionId + "],  htbldesc bytearray: " + pv_htbldesc + "desc in hex: " + Hex.encodeHexString(pv_htbldesc));
+
+      ts = mapTransactionStates.get(transactionId);
+      if(ts == null) {
+         LOG.error("Returning from HBaseTxClient:callCreateTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txid: " + transactionId);
+         return TransReturnCode.RET_NOTX.getShort();
+      }
+
+      try {
+         htdesc = HTableDescriptor.parseFrom(pv_htbldesc);
+      }
+      catch(Exception e) {
+         if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callCreateTable exception in htdesc parseFrom, retval: " +
+            TransReturnCode.RET_EXCEPTION.toString() +
+            " txid: " + transactionId +
+            " DeserializationException: " + e);
+         StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw);
+         e.printStackTrace(pw);
+         LOG.error(sw.toString());
+
+         throw new Exception("DeserializationException in callCreateTable parseFrom, unable to send callCreateTable");
+      }
+
+      try {
+         trxManager.createTable(ts, htdesc, beginEndKeys);
+      }
+      catch (Exception cte) {
+         if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callCreateTable exception trxManager.createTable, retval: " +
+            TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte);
+         StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw);
+         cte.printStackTrace(pw);
+         LOG.error("HBaseTxClient createTable call error: " + sw.toString());
+
+         throw new Exception("createTable call error");
+      }
+      return TransReturnCode.RET_OK.getShort();
+   }
+
+   public short callAlterTable(long transactionId, byte[] pv_tblname, Object[] tableOptions) throws Exception
+   {
+      TransactionState ts;
+      String strTblName = new String(pv_tblname, "UTF-8");
+
+      if (LOG.isTraceEnabled()) LOG.trace("Enter callAlterTable, txid: [" + transactionId + "],  tableName: " + strTblName);
+
+      ts = mapTransactionStates.get(transactionId);
+      if(ts == null) {
+         LOG.error("Returning from HBaseTxClient:callAlterTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txid: " + transactionId);
+         return TransReturnCode.RET_NOTX.getShort();
+      }
+
+      try {
+         trxManager.alterTable(ts, strTblName, tableOptions);
+      }
+      catch (Exception cte) {
+         if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callAlterTable exception trxManager.alterTable, retval: " +
+            TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte);
+         StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw);
+         cte.printStackTrace(pw);
+         LOG.error("HBaseTxClient alterTable call error: " + sw.toString());
+
+         throw new Exception("alterTable call error");
+      }
+      return TransReturnCode.RET_OK.getShort();
+   }
+
+   public short callRegisterTruncateOnAbort(long transactionId, byte[] pv_tblname) throws Exception
+   {
+      TransactionState ts;
+      String strTblName = new String(pv_tblname, "UTF-8");
+
+      if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterTruncateOnAbort, txid: [" + transactionId + "],  tablename: " + strTblName);
+
+      ts = mapTransactionStates.get(transactionId);
+      if(ts == null) {
+         LOG.error("Returning from HBaseTxClient:callRegisterTruncateOnAbort, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txid: " + transactionId);
+         return TransReturnCode.RET_NOTX.getShort();
+      }
+
+      try {
+         trxManager.registerTruncateOnAbort(ts, strTblName);
+      }
+      catch (Exception e) {
+         if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterTruncateOnAbort exception trxManager.registerTruncateOnAbort, retval: " +
+            TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + e);
+         StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw);
+         e.printStackTrace(pw);
+         String msg = "HBaseTxClient registerTruncateOnAbort call error ";
+         LOG.error(msg + " : " + sw.toString());
+         throw new Exception(msg);
+      }
+      return TransReturnCode.RET_OK.getShort();
+   }
+
+   public short callDropTable(long transactionId, byte[] pv_tblname) throws Exception
+   {
+      TransactionState ts;
+      String strTblName = new String(pv_tblname, "UTF-8");
+
+      if (LOG.isTraceEnabled()) LOG.trace("Enter callDropTable, txid: [" + transactionId + "],  tablename: " + strTblName);
+
+      ts = mapTransactionStates.get(transactionId);
+      if(ts == null) {
+         LOG.error("Returning from HBaseTxClient:callDropTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort()  + " txid: " + transactionId);
+         return TransReturnCode.RET_NOTX.getShort();
+      }
+
+      try {
+         trxManager.dropTable(ts, strTblName);
+      }
+      catch (Exception cte) {
+         if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callDropTable exception trxManager.dropTable, retval: " +
+            TransReturnCode.RET_EXCEPTION.toString() +" txid: " + transactionId +" Exception: " + cte);
+         StringWriter sw = new StringWriter();
+         PrintWriter pw = new PrintWriter(sw);
+         cte.printStackTrace(pw);
+         LOG.error("HBaseTxClient dropTable call error: " + sw.toString());
+      }
+      return TransReturnCode.RET_OK.getShort();
+   }
+
+    public short callRegisterRegion(long transactionId,
+                                    long startId,
+                                    int  pv_port,
+                                    byte[] pv_hostname,
+                                    long pv_startcode,
+                                    byte[] pv_regionInfo) throws Exception {
+       String hostname    = new String(pv_hostname);
+       if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterRegion, txid: [" + transactionId + "], startId: " + startId + ", port: "
+           + pv_port + ", hostname: " + hostname + ", reg info len: " + pv_regionInfo.length + " " + new String(pv_regionInfo, "UTF-8"));
+
+       HRegionInfo lv_regionInfo;
+       try {
+          lv_regionInfo = HRegionInfo.parseFrom(pv_regionInfo);
+       }
+       catch (Exception de) {
+          if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion exception in lv_regionInfo parseFrom, retval: " +
+             TransReturnCode.RET_EXCEPTION.toString() + " txid: " + transactionId + " DeserializationException: " + de);
+          StringWriter sw = new StringWriter();
+          PrintWriter pw = new PrintWriter(sw);
+          de.printStackTrace(pw);
+          LOG.error(sw.toString());
+
+          throw new Exception("DeserializationException in lv_regionInfo parseFrom, unable to register region");
+       }
+
+       // TODO Not in CDH 5.1       ServerName lv_servername = ServerName.valueOf(hostname, pv_port, pv_startcode);
+       String lv_hostname_port_string = hostname + ":" + pv_port;
+       String lv_servername_string = ServerName.getServerName(lv_hostname_port_string, pv_startcode);
+       ServerName lv_servername = ServerName.parseServerName(lv_servername_string);
+       TransactionRegionLocation regionLocation = new TransactionRegionLocation(lv_regionInfo, lv_servername);
+       String regionTableName = regionLocation.getRegionInfo().getTable().getNameAsString();
+
+       TransactionState ts = mapTransactionStates.get(transactionId);
+       if(ts == null) {
+          if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion transactionId (" + transactionId +
+                   ") not found in mapTransactionStates of size: " + mapTransactionStates.size());
+          try {
+             ts = trxManager.beginTransaction(transactionId);
+          }
+          catch (IdTmException exc) {
+             LOG.error("HBaseTxClient: beginTransaction for tx (" + transactionId + ") caught exception " + exc);
+             throw new IdTmException("HBaseTxClient: beginTransaction for tx (" + transactionId + ") caught exception " + exc);
+          }
+          synchronized (mapLock) {
+             TransactionState ts2 = mapTransactionStates.get(transactionId);
+             if (ts2 != null) {
+                // Some other thread added the transaction while we were creating one.  It's already in the
+                // map, so we can use the existing one.
+                if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion, found TransactionState object while creating a new one " + ts2);
+                ts = ts2;
+             }
+             else {
+                ts.setStartId(startId);
+                if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion new transactionState created: " + ts );
+             }
+          }// end synchronized
+       }
+       else {
+          if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion existing transactionState found: " + ts );
+          if (ts.getStartId() == -1) {
+            ts.setStartId(startId);
+            if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion reset startId for transactionState: " + ts );
+          }
+       }
+
+       try {
+          trxManager.registerRegion(ts, regionLocation);
+       } catch (IOException e) {
+          LOG.error("HBaseTxClient:callRegisterRegion exception in registerRegion call, txid: " + transactionId +
+            " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " + e);
+          return TransReturnCode.RET_EXCEPTION.getShort();
+       }
+
+       if (LOG.isDebugEnabled()) LOG.debug("RegisterRegion adding table name " + regionTableName);
+       ts.addTableName(regionTableName);
+
+       // Removing unnecessary put back into the map
+       // mapTransactionStates.put(ts.getTransactionId(), ts);
+
+       if (LOG.isTraceEnabled()) LOG.trace("Exit callRegisterRegion, txid: [" + transactionId + "] with mapsize: "
+                  + mapTransactionStates.size());
+       return TransReturnCode.RET_OK.getShort();
+   }
+
+   public int participatingRegions(long transactionId) throws Exception {
+       if (LOG.isTraceEnabled()) LOG.trace("Enter participatingRegions, txid: " + transactionId);
+       TransactionState ts = mapTransactionStates.get(transactionId);
+       if(ts == null) {
+         if (LOG.isTraceEnabled()) LOG.trace("Returning from HBaseTxClient:participatingRegions, txid: " + transactionId + " not found returning: 0");
+          return 0;
+       }
+       int participants = ts.getParticipantCount() - ts.getRegionsToIgnoreCount();
+       if (LOG.isTraceEnabled()) LOG.trace("Exit participatingRegions , txid: [" + transactionId + "] " + participants + " participants");
+       return (ts.getParticipantCount() - ts.getRegionsToIgnoreCount());
+   }
+
+   public long addControlPoint() throws Exception {
+      if (LOG.isTraceEnabled()) LOG.trace("Enter addControlPoint");
+      long result = 0L;
+      try {
+         if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient calling tLog.addControlPoint with mapsize " + mapTransactionStates.size());
+         result = tLog.addControlPoint(mapTransactionStates);
+      }
+      catch(IOException e){
+          LOG.error("addControlPoint IOException " + e);
+          throw e;
+      }
+      Long lowestStartId = Long.MAX_VALUE;
+      for(ConcurrentHashMap.Entry<Long, TransactionState> entry : mapTransactionStates.entrySet()){
+          TransactionState value;
+          value = entry.getValue();
+          long ts = value.getStartId();
+          if( ts < lowestStartId) lowestStartId = ts;
+      }
+      if(lowestStartId < Long.MAX_VALUE)
+      {
+          tmZK.createGCzNode(Bytes.toBytes(lowestStartId));
+      }
+      if (LOG.isTraceEnabled()) LOG.trace("Exit addControlPoint, returning: " + result);
+      return result;
+   }
+
+     /**
+      * Thread to gather recovery information for regions that need to be recovered 
+      */
+     private static class RecoveryThread extends Thread{
+             final int SLEEP_DELAY = 1000; // Initially set to run every 1sec
+             private int sleepTimeInt = 0;
+             private boolean skipSleep = false;
+             private TmAuditTlog audit;
+             private HBaseTmZK zookeeper;
+             private TransactionManager txnManager;
+             private short tmID;
+             private Set<Long> inDoubtList;
+             private boolean continueThread = true;
+             private int recoveryIterations = -1;
+             private int retryCount = 0;
+             private boolean useForgotten;
+             private boolean forceForgotten;
+             private boolean useTlog;
+             HBaseTxClient hbtx;
+
+         public RecoveryThread(TmAuditTlog audit,
+                               HBaseTmZK zookeeper,
+                               TransactionManager txnManager,
+                               HBaseTxClient hbtx,
+                               boolean useForgotten,
+                               boolean forceForgotten,
+                               boolean useTlog) {
+             this(audit, zookeeper, txnManager);
+             this.hbtx = hbtx;
+             this.useForgotten = useForgotten;
+             this.forceForgotten = forceForgotten;
+             this.useTlog= useTlog;
+         }
+             /**
+              *
+              * @param audit
+              * @param zookeeper
+              * @param txnManager
+              */
+             public RecoveryThread(TmAuditTlog audit,
+                                   HBaseTmZK zookeeper,
+                                   TransactionManager txnManager)
+             {
+                          this.audit = audit;
+                          this.zookeeper = zookeeper;
+                          this.txnManager = txnManager;
+                          this.inDoubtList = new HashSet<Long> ();
+                          this.tmID = zookeeper.getTMID();
+
+                          String sleepTime = System.getenv("TMRECOV_SLEEP");
+                          if (sleepTime != null) {
+                                this.sleepTimeInt = Integer.parseInt(sleepTime);
+                                if(LOG.isDebugEnabled()) LOG.debug("Recovery thread sleep set to: " +
+                                                                   this.sleepTimeInt + "ms");
+                          }
+             }
+
+             public void stopThread() {
+                 this.continueThread = false;
+             }
+
+             private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws Exception{
+                 HRegionInfo regionInfoLoc; // = new HRegionInfo();
+                 final byte [] delimiter = ",".getBytes();
+                 String[] result = hostnamePort.split(new String(delimiter), 3);
+
+                 if (result.length < 2)
+                         throw new IllegalArgumentException("Region array format is incorrect");
+
+                 String hostname = result[0];
+                 int port = Integer.parseInt(result[1]);
+                 try {
+                                 regionInfoLoc = HRegionInfo.parseFrom(regionInfo);
+                 }
+                 catch(Exception e) {
+                                 LOG.error("Unable to parse region byte array, " + e);
+                                 throw e;
+                 }
+                 /*
+                 ByteArrayInputStream lv_bis = new ByteArrayInputStream(regionInfo);
+                 DataInputStream lv_dis = new DataInputStream(lv_bis);
+                 try {
+                         regionInfoLoc.readFields(lv_dis);
+                 } catch (Exception e) {
+                         throw new Exception();
+                 }
+                 */
+                 //HBase98 TODO: need to set the value of startcode correctly
+                 //HBase98 TODO: Not in CDH 5.1:  ServerName lv_servername = ServerName.valueOf(hostname, port, 0);
+
+                 String lv_hostname_port_string = hostname + ":" + port;
+                 String lv_servername_string = ServerName.getServerName(lv_hostname_port_string, 0);
+                 ServerName lv_servername = ServerName.parseServerName(lv_servername_string);
+
+                 TransactionRegionLocation loc = new TransactionRegionLocation(regionInfoLoc, lv_servername);
+                 ts.addRegion(loc);
+             }
+
+            @Override
+             public void run() {
+
+                while (this.continueThread) {
+                    try {
+                        skipSleep = false;
+                        Map<String, byte[]> regions = null;
+                        Map<Long, TransactionState> transactionStates =
+                                new HashMap<Long, TransactionState>();
+                        try {
+                            regions = zookeeper.checkForRecovery();
+                        } catch (Exception e) {
+                            if (regions != null) { // ignore no object returned by zookeeper.checkForRecovery
+                               LOG.error("An ERROR occurred while checking for regions to recover. " + "TM: " + tmID);
+                               StringWriter sw = new StringWriter();
+                               PrintWriter pw = new PrintWriter(sw);
+                               e.printStackTrace(pw);
+                               LOG.error(sw.toString());
+                            }
+                        }
+
+                        if(regions != null) {
+                            skipSleep = true;
+                            recoveryIterations++;
+
+                            if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt region size " + regions.size());
+                            for (Map.Entry<String, byte[]> regionEntry : regions.entrySet()) {
+                                List<Long> TxRecoverList = new ArrayList<Long>();
+                                String hostnamePort = regionEntry.getKey();
+                                byte[] regionBytes = regionEntry.getValue();
+                                if (LOG.isDebugEnabled())
+                                    LOG.debug("TRAF RCOV THREAD:Recovery Thread Processing region: " + new String(regionBytes));
+                                if (recoveryIterations == 0) {
+                                   if(LOG.isWarnEnabled()) {
+                                      //  Let's get the host name
+                                      final byte [] delimiter = ",".getBytes();
+                                      String[] hostname = hostnamePort.split(new String(delimiter), 3);
+                                      if (hostname.length < 2) {
+                                         throw new IllegalArgumentException("hostnamePort format is incorrect");
+                                      }
+
+                                      LOG.warn ("TRAF RCOV THREAD:Starting recovery with " + regions.size() +
+                                           " regions to recover.  First region hostname: " + hostnamePort +
+                                           " Recovery iterations: " + recoveryIterations);
+                                   }
+                                }
+                                else {
+                                   if(recoveryIterations % 10 == 0) {
+                                      if(LOG.isWarnEnabled()) {
+                                         //  Let's get the host name
+                                         final byte [] delimiter = ",".getBytes();
+                                         String[] hostname = hostnamePort.split(new String(delimiter), 3);
+                                         if (hostname.length < 2) {
+                                            throw new IllegalArgumentException("hostnamePort format is incorrect");
+                                         }
+                                         LOG.warn("TRAF RCOV THREAD:Recovery thread encountered " + regions.size() +
+                                           " regions to recover.  First region hostname: " + hostnamePort +
+                                           " Recovery iterations: " + recoveryIterations);
+                                      }
+                                   }
+                                }
+                                try {
+                                    TxRecoverList = txnManager.recoveryRequest(hostnamePort, regionBytes, tmID);
+                                }catch (NotServingRegionException e) {
+                                   TxRecoverList = null;
+                                   LOG.error("TRAF RCOV THREAD:NotServingRegionException calling recoveryRequest. regionBytes: " + new String(regionBytes) +
+                                             " TM: " + tmID + " hostnamePort: " + hostnamePort);
+
+                                   try {
+                                      // First delete the zookeeper entry
+                                      LOG.error("TRAF RCOV THREAD:recoveryRequest. Deleting region entry Entry: " + regionEntry);
+                                      zookeeper.deleteRegionEntry(regionEntry);
+                                   }
+                                   catch (Exception e2) {
+                                      LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " +
+                                      new String(regionEntry.getValue()) + " exception: " + e2);
+                                   }
+                                   try {
+                                      // Create a local HTable object using the regionInfo
+                                      HTable table = new HTable(config, HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString());
+                                      try {
+                                         // Repost a zookeeper entry for all current regions in the table
+                                         zookeeper.postAllRegionEntries(table);
+                                      }
+                                      catch (Exception e2) {
+                                         LOG.error("TRAF RCOV THREAD:Error calling postAllRegionEntries. table: " + new String(table.getTableName()) + " exception: " + e2);
+                                      }
+                                   }// try
+                                   catch (Exception e1) {
+                                      LOG.error("TRAF RCOV THREAD:recoveryRequest exception in new HTable " + HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString() + " Exception: " + e1);
+                                   }
+                                }// NotServingRegionException
+                                catch (TableNotFoundException tnfe) {
+                                   // In this case there is nothing to recover.  We just need to delete the region entry.
+                                   try {
+                                      // First delete the zookeeper entry
+                                      LOG.warn("TRAF RCOV THREAD:TableNotFoundException calling txnManager.recoveryRequest. " + "TM: " +
+                                              tmID + " regionBytes: [" + regionBytes + "].  Deleting zookeeper region entry. \n exception: " + tnfe);
+                                      zookeeper.deleteRegionEntry(regionEntry);
+                                   }
+                                   catch (Exception e2) {
+                                      LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " +
+                                      new String(regionEntry.getValue()) + " exception: " + e2);
+                                   }
+
+                                }// TableNotFoundException
+                                catch (DeserializationException de) {
+                                   // We are unable to parse the region info from ZooKeeper  We just need to delete the region entry.
+                                   try {
+                                      // First delete the zookeeper entry
+                                      LOG.warn("TRAF RCOV THREAD:DeserializationException calling txnManager.recoveryRequest. " + "TM: " +
+                                              tmID + " regionBytes: [" + regionBytes + "].  Deleting zookeeper region entry. \n exception: " + de);
+                                      zookeeper.deleteRegionEntry(regionEntry);
+                                   }
+                                   catch (Exception e2) {
+                                      LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " +
+                                      new String(regionEntry.getValue()) + " exception: " + e2);
+                                   }
+
+                                }// DeserializationException
+                                catch (Exception e) {
+                                   LOG.error("TRAF RCOV THREAD:An ERROR occurred calling txnManager.recoveryRequest. " + "TM: " +
+                                              tmID + " regionBytes: [" + regionBytes + "] exception: " + e);
+                                }
+                                if (TxRecoverList != null) {
+                                    if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size of TxRecoverList " + TxRecoverList.size());
+                                    if (TxRecoverList.size() == 0) {
+                                      try {
+                                         // First delete the zookeeper entry
+                                         LOG.warn("TRAF RCOV THREAD:Leftover Znode  calling txnManager.recoveryRequest. " + "TM: " +
+                                                 tmID + " regionBytes: [" + regionBytes + "].  Deleting zookeeper region entry. ");
+                                         zookeeper.deleteRegionEntry(regionEntry);
+                                      }
+                                      catch (Exception e2) {
+                                         LOG.error("TRAF RCOV THREAD:Error calling deleteRegionEntry. regionEntry key: " + regionEntry.getKey() + " regionEntry value: " +
+                                         new String(regionEntry.getValue()) + " exception: " + e2);
+                                      }
+                                   }
+                                   for (Long txid : TxRecoverList) {
+                                      TransactionState ts = transactionStates.get(txid);
+                                      if (ts == null) {
+                                         ts = new TransactionState(txid);
+
+                                         //Identify if DDL is part of this transaction and valid
+                                         if(hbtx.useDDLTrans){
+                                            TmDDL tmDDL = hbtx.getTmDDL();
+                                            StringBuilder state = new StringBuilder ();
+                                            try {
+                                                tmDDL.getState(txid,state);
+                                            }
+                                            catch(Exception egetState){
+                                                LOG.error("TRAF RCOV THREAD:Error calling TmDDL getState." + egetState);
+                                            }
+                                            if(state.toString().equals("VALID"))
+                                               ts.setDDLTx(true);
+                                         }
+                                      }
+                                      try {
+                                         this.addRegionToTS(hostnamePort, regionBytes, ts);
+                                      } catch (Exception e) {
+                                         LOG.error("TRAF RCOV THREAD:Unable to add region to TransactionState, region info: " + new String(regionBytes));
+                                         e.printStackTrace();
+                                      }
+                                      transactionStates.put(txid, ts);
+                                   }
+                                }
+                                else if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD:size od TxRecoverList is NULL ");
+                            }
+                            if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt transaction size " + transactionStates.size());
+                            for (Map.Entry<Long, TransactionState> tsEntry : transactionStates.entrySet()) {
+                                TransactionState ts = tsEntry.getValue();
+                                Long txID = ts.getTransactionId();
+                                // TransactionState ts = new TransactionState(txID);
+                                try {
+                                    audit.getTransactionState(ts);
+                                    if (ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {
+                                        if (LOG.isDebugEnabled())
+                                            LOG.debug("TRAF RCOV THREAD:Redriving commit for " + txID + " number of regions " + ts.getParticipatingRegions().size() +
+                                                    " and tolerating UnknownTransactionExceptions");
+                                        txnManager.doCommit(ts, true /*ignore UnknownTransactionException*/);
+                                        if(useTlog && useForgotten) {
+                                            long nextAsn = tLog.getNextAuditSeqNum((int)(txID >> 32));
+                                            tLog.putSingleRecord(txID, ts.getCommitId(), "FORGOTTEN", null, forceForgotten, nextAsn);
+                                        }
+                                    } else if (ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
+                                        if (LOG.isDebugEnabled())
+                                            LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
+                                        txnManager.abort(ts);
+                                    } else {
+                                        if (LOG.isDebugEnabled())
+                                            LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
+                                        LOG.warn("Recovering transaction " + txID + ", status is not set to COMMITTED or ABORTED. Aborting.");
+                                        txnManager.abort(ts);
+                                    }
+
+                                }catch (UnsuccessfulDDLException ddle) {
+                                    LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " + ddle);
+                                    ddle.printStackTrace();
+
+                                    //Note that there may not be anymore redrive triggers from region server point of view for DDL operation.
+                                    //Register this DDL transaction for subsequent redrive from Audit Control Event.
+                                    //TODO: Launch a new Redrive Thread out of auditControlPoint().
+                                    TmDDL tmDDL = hbtx.getTmDDL();
+                                    try {
+                                        tmDDL.setState(txID,"REDRIVE");
+                                    }
+                                    catch(Exception e){
+                                         LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive" + e);
+                                    }
+                                }
+                                catch (Exception e) {
+                                    LOG.error("Unable to get audit record for tx: " + txID + ", audit is throwing exception.");
+                                    e.printStackTrace();
+                                }
+                            }
+
+                        }
+                        else {
+                            if (recoveryIterations > 0) {
+                                if(LOG.isInfoEnabled()) LOG.info("Recovery completed for TM" + tmID);
+                            }
+                            recoveryIterations = -1;
+                        }
+                        try {
+                            if(continueThread) {
+                                if(!skipSleep) {
+                                    if (sleepTimeInt > 0)
+                                        Thread.sleep(sleepTimeInt);
+                                    else
+                                        Thread.sleep(SLEEP_DELAY);
+                                }
+                            }
+                            retryCount = 0;
+                        } catch (Exception e) {
+                            LOG.error("Error in recoveryThread: " + e);
+                        }
+
+                    } catch (Exception e) {
+                        int possibleRetries = 4;
+                        LOG.error("Caught recovery thread exception for tmid: " + tmID + " retries: " + retryCount);
+                        StringWriter sw = new StringWriter();
+                        PrintWriter pw = new PrintWriter(sw);
+                        e.printStackTrace(pw);
+                        LOG.error(sw.toString());
+
+                        retryCount++;
+                        if(retryCount > possibleRetries) {
+                            LOG.error("Recovery thread failure, aborting process");
+                            System.exit(4);
+                        }
+
+                        try {
+                            Thread.sleep(SLEEP_DELAY / possibleRetries);
+                        } catch(Exception se) {
+                            LOG.error(se);
+                        }
+                    }
+                }
+                if(LOG.isDebugEnabled()) LOG.debug("Exiting recovery thread for tm ID: " + tmID);
+            }
+     }
+
+     //================================================================================
+     // DTMCI Calls
+     //================================================================================
+
+     //--------------------------------------------------------------------------------
+     // callRequestRegionInfo
+     // Purpose: Prepares HashMapArray class to get region information
+     //--------------------------------------------------------------------------------
+      public HashMapArray callRequestRegionInfo() throws Exception {
+
+      String tablename, encoded_region_name, region_name, is_offline, region_id, hostname, port, thn;
+
+      HashMap<String, String> inMap;
+      long lv_ret = -1;
+      Long key;
+      TransactionState value;
+      int tnum = 0; // Transaction number
+
+      if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient::callRequestRegionInfo:: start\n");
+
+      HashMapArray hm = new HashMapArray();
+
+      try{
+      for(ConcurrentHashMap.Entry<Long, TransactionState> entry : mapTransactionStates.entrySet()){
+          key = entry.getKey();
+          value = entry.getValue();
+          long id = value.getTransactionId();
+
+          TransactionState ts = mapTransactionStates.get(id);
+          final Set<TransactionRegionLocation> regions = ts.getParticipatingRegions();
+
+          // TableName
+          Iterator<TransactionRegionLocation> it = regions.iterator();
+          tablename = it.next().getRegionInfo().getTable().getNameAsString();
+          while(it.hasNext()){
+              tablename = tablename + ";" + it.next().getRegionInfo().getTable().getNameAsString();
+          }
+          hm.addElement(tnum, "TableName", tablename);
+
+          // Encoded Region Name
+          Iterator<TransactionRegionLocation> it2 = regions.iterator();
+          encoded_region_name = it2.next().getRegionInfo().getEncodedName();
+          while(it2.hasNext()){
+              encoded_region_name = encoded_region_name + ";" + it2.next().getRegionInfo().getTable().getNameAsString();
+          }
+          hm.addElement(tnum, "EncodedRegionName", encoded_region_name);
+
+          // Region Name
+          Iterator<TransactionRegionLocation> it3 = regions.iterator();
+          region_name = it3.next().getRegionInfo().getRegionNameAsString();
+          while(it3.hasNext()){
+              region_name = region_name + ";" + it3.next().getRegionInfo().getTable().getNameAsString();
+          }
+          hm.addElement(tnum, "RegionName", region_name);
+
+          // Region Offline
+          Iterator<TransactionRegionLocation> it4 = regions.iterator();
+          boolean is_offline_bool = it4.next().getRegionInfo().isOffline();
+          is_offline = String.valueOf(is_offline_bool);
+          hm.addElement(tnum, "RegionOffline", is_offline);
+
+          // Region ID
+          Iterator<TransactionRegionLocation> it5 = regions.iterator();
+          region_id = String.valueOf(it5.next().getRegionInfo().getRegionId());
+          while(it5.hasNext()){
+              region_id = region_id + ";" + it5.next().getRegionInfo().getRegionId();
+          }
+          hm.addElement(tnum, "RegionID", region_id);
+
+          // Hostname
+          Iterator<TransactionRegionLocation> it6 = regions.iterator();
+          thn = String.valueOf(it6.next().getHostname());
+          hostname = thn.substring(0, thn.length()-1);
+          while(it6.hasNext()){
+              thn = String.valueOf(it6.next().getHostname());
+              hostname = hostname + ";" + thn.substring(0, thn.length()-1);
+          }
+          hm.addElement(tnum, "Hostname", hostname);
+
+          // Port
+          Iterator<TransactionRegionLocation> it7 = regions.iterator();
+          port = String.valueOf(it7.next().getPort());
+          while(it7.hasNext()){
+              port = port + ";" + String.valueOf(it7.next().getPort());
+          }
+          hm.addElement(tnum, "Port", port);
+
+          tnum = tnum + 1;
+        }
+      }catch(Exception e){
+         if (LOG.isTraceEnabled()) LOG.trace("Error in getting region info. Map might be empty. Please ensure sqlci insert was done");
+      }
+
+      if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient::callRequestRegionInfo:: end size: " + hm.getSize());
+      return hm;
+   }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/77eab6ba/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HashMapArray.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HashMapArray.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HashMapArray.java
new file mode 100644
index 0000000..0f31128
--- /dev/null
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HashMapArray.java
@@ -0,0 +1,229 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+
+package org.trafodion.dtm;
+
+import java.util.*;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+
+public class HashMapArray {
+
+    private HashMap<String, String> inMap;
+    private ArrayList<HashMap<String, String>> outList;
+
+    public HashMapArray(){
+       inMap = new HashMap<String, String>();
+       outList = new ArrayList<HashMap<String, String>>();
+    }
+
+    //-------------------------------------------------------
+    // addElement
+    // Purpose: Adds element to the HashMap
+    //-------------------------------------------------------
+    public void addElement(int tnum, String key, String value) {
+       if(!outList.isEmpty()){
+          try{
+             inMap = outList.get(tnum);
+             if(inMap.containsKey(key)){
+                return;
+             }
+             else{
+                inMap.put(key, value);
+                return;
+             }
+          }catch ( IndexOutOfBoundsException e){
+             inMap = new HashMap<String, String>();
+             inMap.put(key, value);
+             outList.add(inMap);
+             return;
+          }
+       }
+       // List is empty or map for transid does not exist.
+       inMap = new HashMap<String, String>();
+       inMap.put(key, value);
+       outList.add(inMap);
+   }
+
+   //---------------------------------------------------------
+   // getElement
+   // Purpose: gets element from the HashMap
+   //---------------------------------------------------------
+   public String getElement(int index){
+       inMap = outList.get(index);
+        
+       if (inMap==null) {
+          System.out.println("HashMapArray::getElement:: inMap NULL");
+          return null;
+       }
+       for(Map.Entry<String, String> entry : inMap.entrySet()){
+          String key = entry.getKey();
+          String value = entry.getValue();
+       }
+      return inMap.get("Transid").toString();
+    }
+
+    //-------------------------------------------------------
+    // getTableName
+    // Purpose: gets tableName from the HashMap
+    //-------------------------------------------------------
+    public String getTableName(int index){
+
+       inMap = outList.get(index);
+       if (inMap==null) {
+          System.out.println("HashMapArray::getTableName:: inMap NULL");
+          return null;
+       }
+       return inMap.get("TableName").toString(); 
+    }
+
+    //-------------------------------------------------------
+    // getEncodedRegionName
+    // Purpose: gets EncodedRegionName from the HashMap
+    //-------------------------------------------------------
+    public String getEncodedRegionName(int index){
+
+       inMap = outList.get(index);
+       if (inMap==null) {
+          System.out.println("HashMapArray::getEncodedRegionName:: inMap NULL");
+          return null;
+       }
+       return inMap.get("EncodedRegionName").toString();
+    }
+
+    //-------------------------------------------------------
+    // getRegionName
+    // Purpose: gets regionName from the HashMap
+    //-------------------------------------------------------
+    public String getRegionName(int index){
+
+       inMap = outList.get(index);
+       if (inMap==null) {
+          System.out.println("HashMapArray::getRegionName:: inMap NULL");
+          return null;
+       }
+       return inMap.get("RegionName").toString();
+    }
+
+    //-------------------------------------------------------
+    // getRegionOfflineStatus
+    // Purpose: gets RegionOfflineStatus from the HashMap
+    //-------------------------------------------------------
+    public String getRegionOfflineStatus(int index){
+
+       inMap = outList.get(index);
+       if (inMap==null) {
+          System.out.println("HashMapArray::getRegionOfflineStatus:: inMap NULL");
+          return null;
+       }
+       return inMap.get("RegionOffline").toString();
+    }
+
+    //-------------------------------------------------------
+    // getRegionId
+    // Purpose: gets RegionId from the HashMap
+    //-------------------------------------------------------
+    public String getRegionId(int index){
+
+       inMap = outList.get(index);
+       if (inMap==null) {
+          System.out.println("HashMapArray::getRegionId:: inMap NULL");
+          return null;
+       }
+       return inMap.get("RegionID").toString();
+    }
+
+    //-------------------------------------------------------
+    // getHostName
+    // Purpose: gets HostName from the HashMap
+    //-------------------------------------------------------
+    public String getHostName(int index){
+
+       inMap = outList.get(index);
+       if (inMap==null) {
+          System.out.println("HashMapArray::getHostName:: inMap NULL");
+          return null;
+       }
+       return inMap.get("Hostname").toString();
+    }
+
+    //-------------------------------------------------------
+    // getPort
+    // Purpose: gets Port from the HashMap
+    //-------------------------------------------------------
+    public String getPort(int index){
+
+       inMap = outList.get(index);
+       if (inMap==null) {
+          System.out.println("HashMapArray::getPort:: inMap NULL");
+          return null;
+       }
+       return inMap.get("Port").toString();
+    }
+
+    //-------------------------------------------------------
+    // getSize
+    // Purpose: gets size of the list
+    //-------------------------------------------------------
+    public int getSize() {
+       int listSize = outList.size();
+       return listSize;
+    }
+
+    public HashMap<String, String> getInMap(int index){
+       inMap = outList.get(index);
+       return inMap;
+    } 
+
+ 
+    public static void main(String[] Args) {
+       HashMap<String, String> map; 
+      
+       HashMapArray hm = new HashMapArray();
+
+       hm.addElement(0, "Niece", "Caro");
+       hm.addElement(0, "Mom", "Doris");
+       hm.addElement(1, "Pet", "Nenna");
+       hm.addElement(1, "Dad", "Andres");
+       hm.addElement(1, "TableName", "SEABASE.T1");
+
+       map = hm.getInMap(0);
+
+       System.out.println();
+       
+
+       for(int i=0; i<hm.getSize(); i++){
+          map = hm.getInMap(i);
+          for(Map.Entry<String, String> entry : map.entrySet()){
+             String key = entry.getKey();
+             String value = entry.getValue();
+
+             System.out.println("index: " + i + "  " + key + " = " + value);
+          }
+       }
+    }
+}
+
+
+