You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2015/10/22 01:56:50 UTC

[4/6] incubator-trafodion git commit: [TRAFODION-34]Support region splitting/balancing

[TRAFODION-34]Support region splitting/balancing

With transactions active. Functionality being checked in disabled.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/65d0f1cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/65d0f1cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/65d0f1cf

Branch: refs/heads/master
Commit: 65d0f1cfe43cb51077227277e5b6321fd69aa442
Parents: 46f7be4
Author: Oliver Bucaojit <ol...@esgyn.com>
Authored: Tue Oct 20 04:24:54 2015 +0000
Committer: Oliver Bucaojit <ol...@esgyn.com>
Committed: Tue Oct 20 04:24:54 2015 +0000

----------------------------------------------------------------------
 .../transactional/TransactionManager.java       |  204 +-
 .../client/transactional/TransactionState.java  |   10 +
 .../transactional/TransactionalReturn.java      |    1 +
 .../transactional/TransactionalTable.java       |  285 +-
 .../transactional/SplitBalanceHelper.java       |  296 ++
 .../transactional/TrxRegionEndpoint.java        |  272 +-
 .../transactional/TrxRegionObserver.java        |  232 +-
 .../generated/TrxRegionProtos.java              | 3001 +++++++++++++++++-
 .../transactional/TransactionState.java         |   13 +
 .../transactional/TrxTransactionState.java      |    6 +
 .../hbase-trx/src/main/protobuf/TrxRegion.proto |   21 +
 11 files changed, 4067 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
index 53b0bde..f8f5eaf 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@ -44,12 +44,14 @@
 
 package org.apache.hadoop.hbase.client.transactional;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Collection;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -62,6 +64,8 @@ import java.util.HashMap;
 
 import org.apache.commons.codec.binary.Hex;
 
+import org.apache.hadoop.fs.Path;
+
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -282,7 +286,10 @@ public class TransactionManager {
         }
         startKey = location.getRegionInfo().getStartKey();
         endKey_orig = location.getRegionInfo().getEndKey();
-        endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
+        if(endKey_orig == null || endKey_orig == HConstants.EMPTY_END_ROW)
+          endKey = null;
+        else
+          endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
     }
 
     /**
@@ -334,12 +341,12 @@ public class TransactionManager {
                   LOG.error(msg + ":" + e);
                   throw new Exception(msg);
                }
-               if(result.size() != 1) {
+               if(result.size() == 0) {
                   LOG.error("doCommitX, received incorrect result size: " + result.size() + " txid: " + transactionId);
                   refresh = true;
                   retry = true;
                }
-               else {
+               else if(result.size() == 1){
                   // size is 1
                   for (CommitResponse cresponse : result.values()){
                     if(cresponse.getHasException()) {
@@ -361,6 +368,31 @@ public class TransactionManager {
                }
                retry = false;
              }
+             else {
+                  for (CommitResponse cresponse : result.values()){
+                    if(cresponse.getHasException()) {
+                      String exceptionString = new String (cresponse.getException().toString());
+                      if (exceptionString.contains("UnknownTransactionException")) {
+                        if (ignoreUnknownTransactionException == true) {
+                          if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring UnknownTransactionException in cresponse");
+                        }
+                        else {
+                          LOG.error("doCommitX, coprocessor UnknownTransactionException: " + cresponse.getException());
+                          throw new UnknownTransactionException();
+                        }
+                      }
+                      else if(exceptionString.contains("Asked to commit a non-pending transaction")) {
+                        if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring 'commit non-pending transaction' in cresponse");
+                      }
+                      else {
+                        if (LOG.isTraceEnabled()) LOG.trace("doCommitX coprocessor exception: " + cresponse.getException());
+                        throw new Exception(cresponse.getException());
+                      }
+                  }
+               }
+               retry = false;
+             }
+
           }
           catch (UnknownTransactionException ute) {
               LOG.error("Got unknown exception in doCommitX for transaction: " + transactionId + " " + ute);
@@ -368,16 +400,23 @@ public class TransactionManager {
               throw new UnknownTransactionException();
           }
           catch (Exception e) {
-             LOG.error("doCommitX retrying due to Exception: " + e);
-             refresh = true;
-             retry = true;
+             if(e.toString().contains("Asked to commit a non-pending transaction")) {
+               if (LOG.isDebugEnabled()) LOG.debug("doCommitX will not retry: " + e);
+               refresh = false;
+               retry = false;
+             }
+             else {
+               LOG.error("doCommitX retrying due to Exception: " + e);
+               refresh = true;
+               retry = true;
+             }
           }
           if (refresh) {
 
              HRegionLocation lv_hrl = table.getRegionLocation(startKey);
              HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-             String          lv_node = lv_hrl.getHostname();
-             int             lv_length = lv_node.indexOf('.');
+             //String          lv_node = lv_hrl.getHostname();
+             //int             lv_length = lv_node.indexOf('.');
 
              if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + " endKey: "
                      + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -494,8 +533,6 @@ public class TransactionManager {
 
              HRegionLocation lv_hrl = table.getRegionLocation(startKey);
              HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-             String          lv_node = lv_hrl.getHostname();
-             int             lv_length = lv_node.indexOf('.');
 
              if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
                      + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -585,47 +622,89 @@ public class TransactionManager {
              Map<byte[], CommitRequestResponse> result = null;
 
              try {
-//                if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- before coprocessorService txid: " + transactionId + " table: " + table.toString());
-//                if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- txid: " + transactionId + " table: " + table.toString() + " endKey_Orig: " + new String(endKey_orig, "UTF-8"));
-//                if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- " + table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
-
-//                HRegionLocation lv_hrl = table.getRegionLocation(startKey);
-//                HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-//                String          lv_node = lv_hrl.getHostname();
-//                int             lv_length = lv_node.indexOf('.');
-
-//                if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) ||  // Encoded name is different
-//                        (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) {            // Node is different
-//                   if (LOG.isInfoEnabled())LOG.info("doPrepareX -- " + table.toString() + " location being refreshed");
-//                   if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- lv_hri: " + lv_hri);
-//                   if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location.getRegionInfo(): " + location.getRegionInfo());
-//                   if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- lv_node: " + lv_node + " lv_length: " + lv_length);
-//                   if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location.getHostname(): " + location.getHostname());
-//                   table.getRegionLocation(startKey, true);
-//                }
                 result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
              } catch (Throwable e) {
                 LOG.error("doPrepareX coprocessor error for " + Bytes.toString(regionName) + " txid: " + transactionId + ":" + e);
                 throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error");
              }
 
-             if(result.size() != 1)  {
+             if(result.size() == 0)  {
                 LOG.error("doPrepareX, received incorrect result size: " + result.size());
                 refresh = true;
                 retry = true;
              }
-             else {
+             else if(result.size() == 1){
                 // size is 1
                 for (CommitRequestResponse cresponse : result.values()){
                    // Should only be one result
                    int value = cresponse.getResult();
                    commitStatus = value;
                    if(cresponse.getHasException()) {
-                      if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " + cresponse.getException());
-                      throw new Exception(cresponse.getException());
+                      if(transactionState.hasRetried() &&
+                          cresponse.getException().contains("encountered unknown transactionID")) {
+                        retry = false;
+                        commitStatus = TransactionalReturn.COMMIT_OK_READ_ONLY;
+                      }
+                      else {
+                        if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " + cresponse.getException());
+                        throw new Exception(cresponse.getException());
+                      }
+                   }
+                   if(value == TransactionalReturn.COMMIT_RESEND) {
+                     // Handle situation where repeated region is in list due to different endKeys
+                     int count = 0;
+                     for(TransactionRegionLocation trl : this.transactionState.getParticipatingRegions()) {
+                       if(trl.getRegionInfo().getTable().toString()
+                               .compareTo(location.getRegionInfo().getTable().toString()) == 0
+                               &&
+                          Arrays.equals(trl.getRegionInfo().getStartKey(),
+                               location.getRegionInfo().getStartKey())) {
+                         count++;
+                       }
+                     }
+                     if(count > 1) {
+                       commitStatus = TransactionalReturn.COMMIT_OK;
+                       retry = false;
+                     }
+                     else {
+                       // Pause for split to complete and retry
+                       Thread.sleep(100);
+                       retry = true;
+                     }
+                   }
+                   else {
+                     retry = false;
                    }
                 }
-                retry = false;
+             }
+             else {
+               for(CommitRequestResponse cresponse : result.values()) {
+                 if(cresponse.getResult() == TransactionalReturn.COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR ||
+                  cresponse.getResult() == TransactionalReturn.COMMIT_CONFLICT ||
+                  cresponse.getResult() == TransactionalReturn.COMMIT_UNSUCCESSFUL ||
+                  commitStatus == 0) {
+                     commitStatus = cresponse.getResult();
+
+                     if(cresponse.getHasException()) {
+                       if(cresponse.getException().contains("encountered unknown transactionID")) {
+                         retry = false;
+                         commitStatus = TransactionalReturn.COMMIT_OK_READ_ONLY;
+                       }
+                       else {
+                         if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " +
+                            cresponse.getException());
+                         throw new Exception(cresponse.getException());
+                       }
+                     }
+                 }
+               }
+
+                 if(commitStatus == TransactionalReturn.COMMIT_OK ||
+                    commitStatus == TransactionalReturn.COMMIT_OK_READ_ONLY ||
+                    commitStatus == TransactionalReturn.COMMIT_RESEND) {
+                   commitStatus = TransactionalReturn.COMMIT_OK;
+                 }
+               retry = false;
              }
           }
           catch(UnknownTransactionException ute) {
@@ -641,8 +720,6 @@ public class TransactionManager {
 
              HRegionLocation lv_hrl = table.getRegionLocation(startKey);
              HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-             String          lv_node = lv_hrl.getHostname();
-             int             lv_length = lv_node.indexOf('.');
 
              if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
                      + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -742,8 +819,6 @@ public class TransactionManager {
 
              HRegionLocation lv_hrl = table.getRegionLocation(startKey);
              HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-             String          lv_node = lv_hrl.getHostname();
-             int             lv_length = lv_node.indexOf('.');
 
              if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
                      + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -789,6 +864,7 @@ public class TransactionManager {
        switch (commitStatus) {
           case TransactionalReturn.COMMIT_OK:
             break;
+          case TransactionalReturn.COMMIT_RESEND:
           case TransactionalReturn.COMMIT_OK_READ_ONLY:
             transactionState.addRegionToIgnore(location); // No need to doCommit for read-onlys
             readOnly = true;
@@ -870,39 +946,45 @@ public class TransactionManager {
                   throw new Exception(msg);
               }
 
-              if(result.size() != 1) {
-                     LOG.error("doAbortX, received incorrect result size: " + result.size());
+              if(result.size() == 0) {
+                     LOG.error("doAbortX, received 0 region results.");
                      refresh = true;
                      retry = true;
-                  }
-                  else {
-                     for (AbortTransactionResponse cresponse : result.values()) {
-                if(cresponse.getHasException()) {
-                  String exceptionString = cresponse.getException().toString();
-                  LOG.error("Abort HasException true: " + exceptionString);
-                  if(exceptionString.contains("UnknownTransactionException")) {
-                         throw new UnknownTransactionException();
-                  }
-                  throw new Exception(cresponse.getException());
-                }
               }
-              retry = false;
-          }
+              else {
+                 for (AbortTransactionResponse cresponse : result.values()) {
+                   if(cresponse.getHasException()) {
+                     String exceptionString = cresponse.getException().toString();
+                     LOG.error("Abort HasException true: " + exceptionString);
+                     if(exceptionString.contains("UnknownTransactionException")) {
+                       throw new UnknownTransactionException();
+                     }
+                     throw new Exception(cresponse.getException());
+                   }
+                 }
+                 retry = false;
               }
+           }
           catch (UnknownTransactionException ute) {
                  LOG.debug("UnknownTransactionException in doAbortX for transaction: " + transactionId + "(ignoring): " + ute);
           }
           catch (Exception e) {
-                LOG.error("doAbortX retrying due to Exception: " + e );
-                refresh = true;
-                retry = true;
+                if(e.toString().contains("Asked to commit a non-pending transaction")) {
+                  LOG.error("doCommitX will not retry: " + e);
+                  refresh = false;
+                  retry = false;
+                }
+                else {
+                    LOG.error("doAbortX retrying due to Exception: " + e );
+                    refresh = true;
+                    retry = true;
+                }
+
               }
               if (refresh) {
 
                  HRegionLocation lv_hrl = table.getRegionLocation(startKey);
                  HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-                 String          lv_node = lv_hrl.getHostname();
-                 int             lv_length = lv_node.indexOf('.');
 
                  if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
                      + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -998,8 +1080,6 @@ public class TransactionManager {
 
                  HRegionLocation lv_hrl = table.getRegionLocation(startKey);
                  HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-                 String          lv_node = lv_hrl.getHostname();
-                 int             lv_length = lv_node.indexOf('.');
 
                  if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
                      + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -1098,8 +1178,6 @@ public class TransactionManager {
 
            HRegionLocation lv_hrl = table.getRegionLocation(startKey);
            HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-           String          lv_node = lv_hrl.getHostname();
-           int             lv_length = lv_node.indexOf('.');
 
            if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
                    + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -1179,8 +1257,6 @@ public class TransactionManager {
        if (refresh) {
          HRegionLocation lv_hrl = table.getRegionLocation(startKey);
          HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-         String          lv_node = lv_hrl.getHostname();
-         int             lv_length = lv_node.indexOf('.');
 
          if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
                   + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -1309,8 +1385,6 @@ public class TransactionManager {
          if (refresh) {
             HRegionLocation lv_hrl = table.getRegionLocation(startKey);
             HRegionInfo     lv_hri = lv_hrl.getRegionInfo();
-            String          lv_node = lv_hrl.getHostname();
-            int             lv_length = lv_node.indexOf('.');
 
             if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
                      + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
index 45cf8c9..53c4951 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
@@ -88,6 +88,7 @@ public class TransactionState {
     private boolean ddlTrans;
     private static boolean useConcurrentHM = false;
     private static boolean getCHMVariable = true;
+    private boolean hasRetried = false;
 
     public Set<String> tableNames = Collections.synchronizedSet(new HashSet<String>());
     public Set<TransactionRegionLocation> participatingRegions;
@@ -448,4 +449,13 @@ public class TransactionState {
     public void setDDLTx(final boolean status) {
         this.ddlTrans = status;
     }
+
+    public void setRetried(boolean val) {
+        this.hasRetried = val;
+    }
+
+    public boolean hasRetried() {
+      return this.hasRetried;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java
index e3d3650..3bb2ea4 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java
@@ -34,4 +34,5 @@ public interface TransactionalReturn {
   final int COMMIT_UNSUCCESSFUL = 4;
   /** Status code representing a transaction that cannot be committed due to conflict. */
   final int COMMIT_CONFLICT = 5;
+  final int COMMIT_RESEND = 6;
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
index 5c67129..419664b 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
@@ -44,12 +44,16 @@
 
 package org.apache.hadoop.hbase.client.transactional;
 
+import java.io.File;
+import java.util.Collection;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -61,6 +65,7 @@ import org.apache.commons.codec.binary.Hex;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -75,6 +80,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos;
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRequest;
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteResponse;
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRequest;
@@ -99,6 +105,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import org.apache.hadoop.hbase.regionserver.transactional.SingleVersionDeleteNotSupported;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.fs.Path;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.HBaseZeroCopyByteString;
@@ -113,6 +120,9 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
     static private HConnection connection = null;
     static Configuration       config = HBaseConfiguration.create();
     static ExecutorService     threadPool;
+    static int                 retries = 15;
+    static int                 delay = 1000;
+    private String retryErrMsg = "Coprocessor result is null, retries exhausted";
 
     static {
 	config.set("hbase.hregion.impl", "org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion");
@@ -194,27 +204,48 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
        }   
       };  
 
-      Map<byte[], GetTransactionalResponse> result = null;   
+      GetTransactionalResponse result = null;   
       try {
-        result = super.coprocessorService(TrxRegionService.class, get.getRow(), get.getRow(), callable);
+        int retryCount = 0;
+        boolean retry = false;
+        do {
+          Iterator<Map.Entry<byte[], TrxRegionProtos.GetTransactionalResponse>> it = super.coprocessorService(TrxRegionService.class, 
+                                                                                                              get.getRow(), 
+                                                                                                              get.getRow(), 
+                                                                                                              callable)
+                                                                                                              .entrySet().iterator();
+          if(it.hasNext()) {
+            result = it.next().getValue();
+            retry = false;
+          } 
+
+          if(result == null || result.getException().contains("closing region")) {
+            Thread.sleep(TransactionalTable.delay);
+            retry = true;
+            transactionState.setRetried(true);
+            retryCount++;
+          }
+        } while (retryCount < TransactionalTable.retries && retry == true);
       } catch (Throwable e) {
         e.printStackTrace();    
         throw new IOException("ERROR while calling coprocessor");
       }            
-      Collection<GetTransactionalResponse> results = result.values();
+      //Collection<GetTransactionalResponse> results = result.values();
       // Should only be one result, if more than one. Can't handle.
       // Need to test whether '!=' or '>' is correct
-      if (LOG.isTraceEnabled()) LOG.trace("Results count: " + results.size());
+      //if (LOG.isTraceEnabled()) LOG.trace("Results count: " + results.size());
       //if(results.size() != 1)
       //  throw new IOException("Incorrect number of results from coprocessor call");      
-      GetTransactionalResponse[] resultArray = new GetTransactionalResponse[results.size()];    		  
-      results.toArray(resultArray);            
-      if(resultArray.length == 0) 
-    	  throw new IOException("Problem with calling coprocessor, no regions returned result");
+      //GetTransactionalResponse[] resultArray = new GetTransactionalResponse[results.size()];    		  
+      //results.toArray(resultArray);            
+      //if(resultArray.length == 0) 
+    	//  throw new IOException("Problem with calling coprocessor, no regions returned result");
       
-      if(resultArray[0].hasException())
-        throw new IOException(resultArray[0].getException());
-      return ProtobufUtil.toResult(resultArray[0].getResult());      
+      if(result == null)
+        throw new IOException(retryErrMsg);
+      else if(result.hasException())
+        throw new IOException(result.getException());
+      return ProtobufUtil.toResult(result.getResult());      
     }
     
     /**
@@ -254,10 +285,28 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
         };
 
         byte[] row = delete.getRow();
-        Map<byte[], DeleteTransactionalResponse> result = null; 
+        DeleteTransactionalResponse result = null; 
         try {
-          result = super.coprocessorService(TrxRegionService.class, row, row, callable);
-          
+          int retryCount = 0;
+          boolean retry = false;
+          do {
+            Iterator<Map.Entry<byte[], DeleteTransactionalResponse>> it = super.coprocessorService(TrxRegionService.class, 
+                                              row, 
+                                              row, 
+                                              callable)
+                                              .entrySet().iterator();
+            if(it.hasNext()) {
+              result = it.next().getValue();
+              retry = false;
+            }
+
+            if(result == null || result.getException().contains("closing region")) {
+              Thread.sleep(TransactionalTable.delay);
+              retry = true;
+              transactionState.setRetried(true);
+              retryCount++;
+            }
+          } while (retryCount < TransactionalTable.retries && retry == true);
         } catch (ServiceException e) {
           e.printStackTrace();
           throw new IOException();
@@ -265,16 +314,11 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
           t.printStackTrace();
           throw new IOException();
         } 
-        Collection<DeleteTransactionalResponse> results = result.values();
-        //GetTransactionalResponse[] resultArray = (GetTransactionalResponse[]) results.toArray();
-        DeleteTransactionalResponse[] resultArray = new DeleteTransactionalResponse[results.size()];
-        results.toArray(resultArray);
-        
-        if(resultArray.length == 0) 
-      	  throw new IOException("Problem with calling coprocessor, no regions returned result");
 
-        if(resultArray[0].hasException())
-          throw new IOException(resultArray[0].getException());
+        if(result == null)
+          throw new IOException(retryErrMsg);
+        else if(result.hasException())
+          throw new IOException(result.getException());
     }
 
     /**
@@ -318,21 +362,37 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
         return rpcCallback.get();
       }
     };
-    Map<byte[], PutTransactionalResponse> result = null; 
+    PutTransactionalResponse result = null; 
     try {
-      result = super.coprocessorService(TrxRegionService.class, put.getRow(), put.getRow(), callable);
+      int retryCount = 0;
+      boolean retry = false;
+      do {
+        Iterator<Map.Entry<byte[], PutTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class, 
+                                                                                          put.getRow(), 
+                                                                                          put.getRow(), 
+                                                                                          callable)
+                                                                                          .entrySet().iterator();
+        if(it.hasNext()) {
+          result = it.next().getValue();
+          retry = false;
+        }
+
+        if(result == null || result.getException().contains("closing region")) {
+          Thread.sleep(TransactionalTable.delay);
+          retry = true;
+          transactionState.setRetried(true);
+          retryCount++;
+        }
+
+      } while(retryCount < TransactionalTable.retries && retry == true);
     } catch (Throwable e) {
       e.printStackTrace();
       throw new IOException("ERROR while calling coprocessor");
     }    
-    Collection<PutTransactionalResponse> results = result.values();
-    PutTransactionalResponse[] resultArray = new PutTransactionalResponse[results.size()]; 
-    results.toArray(resultArray);
-    if(resultArray.length == 0) 
-  	  throw new IOException("Problem with calling coprocessor, no regions returned result");
-    
-    if(resultArray[0].hasException())
-      throw new IOException(resultArray[0].getException());
+    if(result == null)
+      throw new IOException(retryErrMsg);
+    else if(result.hasException())
+      throw new IOException(result.getException());
     
     // put is void, may not need to check result
     if (LOG.isTraceEnabled()) LOG.trace("TransactionalTable.put EXIT");
@@ -394,22 +454,38 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
         }
       };
 
-      Map<byte[], CheckAndDeleteResponse> result = null;
+      CheckAndDeleteResponse result = null;
       try {
-        result = super.coprocessorService(TrxRegionService.class, delete.getRow(), delete.getRow(), callable);
+        int retryCount = 0;
+        boolean retry = false;
+        do {
+          Iterator<Map.Entry<byte[], CheckAndDeleteResponse>> it = super.coprocessorService(TrxRegionService.class, 
+                                                                                            delete.getRow(), 
+                                                                                            delete.getRow(), 
+                                                                                            callable)
+                                                                                            .entrySet()
+                                                                                            .iterator();
+          if(it.hasNext()) {
+            result = it.next().getValue();
+            retry = false;
+          }
+
+          if(result == null || result.getException().contains("closing region")) {
+            Thread.sleep(TransactionalTable.delay);
+            retry = true;
+            transactionState.setRetried(true);
+            retryCount++;
+          }
+        } while (retryCount < TransactionalTable.retries && retry == true);
       } catch (Throwable e) {
         e.printStackTrace();
         throw new IOException("ERROR while calling coprocessor");
       }
-      
-      Collection<CheckAndDeleteResponse> results = result.values();
-      
-      if(results.size() == 0) 
-    	  throw new IOException("Problem with calling coprocessor, no regions returned result");
-     CheckAndDeleteResponse response = results.iterator().next();
-      if(response.hasException())
-          throw new IOException(response.getException());
-      return response.getResult();
+      if(result == null)
+        throw new IOException(retryErrMsg);
+      else if(result.hasException())
+        throw new IOException(result.getException());
+      return result.getResult();
    }
     
 	public boolean checkAndPut(final TransactionState transactionState,
@@ -460,23 +536,42 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
       }
     };
 
-      Map<byte[], CheckAndPutResponse> result = null;
+      CheckAndPutResponse result = null;
       try {
-	  result = super.coprocessorService(TrxRegionService.class, put.getRow(), put.getRow(), callable);
+        int retryCount = 0;
+        boolean retry = false;
+        do {
+          Iterator<Map.Entry<byte[], CheckAndPutResponse>> it = super.coprocessorService(TrxRegionService.class, 
+                                                                                      put.getRow(), 
+                                                                                      put.getRow(), 
+                                                                                      callable)
+                                                                                      .entrySet()
+                                                                                      .iterator();
+          if(it.hasNext()) {
+            result = it.next().getValue();
+            retry = false;
+          }
+
+          if(result == null || result.getException().contains("closing region")) {
+            Thread.sleep(TransactionalTable.delay);
+            retry = true;
+            transactionState.setRetried(true);
+            retryCount++;
+          }
+        } while (retryCount < TransactionalTable.retries && retry == true);
       } catch (Throwable e) {        
           StringWriter sw = new StringWriter();
           PrintWriter pw = new PrintWriter(sw);
           e.printStackTrace(pw);
-          //sw.toString();
         throw new IOException("ERROR while calling coprocessor " + sw.toString());       
       }
-      Collection<CheckAndPutResponse> results = result.values();
-      if(results.size() == 0) 
-    	  throw new IOException("Problem with calling coprocessor, no regions returned result");
-      CheckAndPutResponse response = results.iterator().next();
-      if(response.hasException())
-          throw new IOException(response.getException());
-      return response.getResult();          
+
+      if(result == null)
+        throw new IOException(retryErrMsg);
+      else if(result.hasException())
+        throw new IOException(result.getException());
+
+      return result.getResult();          
     }
 
        /**
@@ -541,28 +636,38 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
    	      }
    	    };
    	    
-   	   Map<byte[], DeleteMultipleTransactionalResponse> result = null;
+   	   DeleteMultipleTransactionalResponse result = null;
  	      try {
- 	        result = super.coprocessorService(TrxRegionService.class, 
-                                                  entry.getValue().get(0).getRow(),
-                                                  entry.getValue().get(0).getRow(),
-                                                  callable);
+ 	        int retryCount = 0;
+ 	        boolean retry = false;
+ 	        do {
+ 	          Iterator<Map.Entry<byte[], DeleteMultipleTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class, 
+ 	                                            entry.getValue().get(0).getRow(), 
+ 	                                            entry.getValue().get(0).getRow(), 
+ 	                                            callable)
+ 	                                            .entrySet().iterator();
+ 	          if(it.hasNext()) {
+ 	            result = it.next().getValue();
+ 	            retry = false;
+ 	          }
+
+ 	          if(result == null || result.getException().contains("closing region")) {
+ 	            Thread.sleep(TransactionalTable.delay);
+ 	            retry = true;
+ 	            transactionState.setRetried(true);
+ 	            retryCount++;
+ 	          }
+ 	        } while (retryCount < TransactionalTable.retries && retry == true);
+ 	        
  	      } catch (Throwable e) {
  	        e.printStackTrace();
 	        throw new IOException("ERROR while calling coprocessor");
  	      }
-	      if(result.size() > 1) {
-             LOG.error("result size for multiple delete:" + result.size());
-             throw new IOException("Incorrect number of region results");
-	      }
-	      Collection<DeleteMultipleTransactionalResponse> results = result.values();
-	      DeleteMultipleTransactionalResponse[] resultArray = new DeleteMultipleTransactionalResponse[results.size()];
-	      results.toArray(resultArray);
-	      if(resultArray.length == 0) 
-	    	  throw new IOException("Problem with calling coprocessor, no regions returned result");
-	      
-          if (resultArray[0].hasException())
-             throw new IOException(resultArray[0].getException());
+
+             if(result == null)
+               throw new IOException(retryErrMsg);
+             else if (result.hasException())
+               throw new IOException(result.getException());
 	   }
    	}
 
@@ -627,24 +732,36 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
 	        return rpcCallback.get();
 	      }
 	    };
-	    Map<byte[], PutMultipleTransactionalResponse> result = null;
+	    PutMultipleTransactionalResponse result = null;
       try {
-        result = super.coprocessorService(TrxRegionService.class,
-                                          entry.getValue().get(0).getRow(),
-                                          entry.getValue().get(0).getRow(),
-                                          callable);
+        int retryCount = 0;
+        boolean retry = false;
+        do {
+          Iterator<Map.Entry<byte[], PutMultipleTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class, 
+                                            entry.getValue().get(0).getRow(),
+                                            entry.getValue().get(0).getRow(),                                             
+                                            callable)
+                                            .entrySet().iterator();
+          if(it.hasNext()) {
+            result = it.next().getValue();
+            retry = false;
+          }
+
+          if(result == null || result.getException().contains("closing region")) {
+            Thread.sleep(TransactionalTable.delay);
+            retry = true;
+            transactionState.setRetried(true);
+            retryCount++;
+          }
+        } while (retryCount < TransactionalTable.retries && retry == true);
       } catch (Throwable e) {
         e.printStackTrace();
         throw new IOException("ERROR while calling coprocessor");
       }
-      Collection<PutMultipleTransactionalResponse> results = result.values();
-      PutMultipleTransactionalResponse[] resultArray = new PutMultipleTransactionalResponse[results.size()];
-      results.toArray(resultArray);
-      if(resultArray.length == 0) 
-    	  throw new IOException("Problem with calling coprocessor, no regions returned result");
-      
-      if (resultArray[0].hasException()) 
-        throw new IOException(resultArray[0].getException());
+      if(result == null)
+        throw new IOException(retryErrMsg);
+      else if (result.hasException()) 
+        throw new IOException(result.getException());
      }
 		}
 	

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
new file mode 100644
index 0000000..1107bdc
--- /dev/null
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
@@ -0,0 +1,296 @@
+package org.apache.hadoop.hbase.coprocessor.transactional;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionScannerHolder;
+import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+public class SplitBalanceHelper {
+	private static final Log LOG = LogFactory.getLog(SplitBalanceHelper.class);
+	
+	private Path flushPath;
+	
+	private static String zkTable = "/hbase/table";
+	private static String zSplitBalPath =        TrxRegionObserver.zTrafPath + "splitbalance/";
+	private static String zSplitBalPathNoSlash = TrxRegionObserver.zTrafPath + "splitbalance";
+	private static String SPLIT = "SPLIT";
+	private static String BALANCE = "BALANCE";
+	private static final String FLUSH_PATH = "traf.txn.out";
+	private static AtomicBoolean needsCleanup = new AtomicBoolean(true);
+	private String balancePath;
+	private String splitPath;
+	private String regionPath;
+	
+	private ZooKeeperWatcher zkw;
+	private HRegionInfo hri;
+	private HRegion region;
+	private String tablename;
+	
+	
+	public SplitBalanceHelper(HRegion my_Region, ZooKeeperWatcher zkw) {
+
+	  String fileName = FLUSH_PATH + getTimeStamp();
+		this.region = my_Region;
+		this.hri = my_Region.getRegionInfo();
+		this.zkw = zkw;
+		this.tablename = my_Region.getTableDesc().getNameAsString();
+		try {
+			if (ZKUtil.checkExists(zkw, zSplitBalPathNoSlash) == -1) 
+				ZKUtil.createWithParents(zkw, zSplitBalPathNoSlash);
+		} catch (KeeperException ke) {
+			LOG.error("ERROR: Zookeeper exception: "+ ke);
+		}
+		this.flushPath = new Path(region.getRegionFileSystem().getRegionDir(), fileName);
+		regionPath = zSplitBalPath  + this.tablename + "/" + hri.getEncodedName();
+		balancePath =  regionPath + "/" + BALANCE + "/";
+		splitPath = regionPath + "/" + SPLIT + "/";
+		
+    if (SplitBalanceHelper.needsCleanup.compareAndSet(true, false)) {
+      zkCleanup();
+    }
+	}
+	
+	public Path getPath() {
+		return flushPath;
+	}
+	
+	public boolean getSplit() {
+		return getSplit(null);
+	}
+    
+	public boolean getSplit(StringBuilder path) {
+		try {
+			byte [] splPath = ZKUtil.getData(zkw, splitPath.substring(0,splitPath.length()-1));
+			if (splPath == null) {
+				return false;
+                        }
+			else{
+				if (path != null)
+					path.append(splPath.toString());
+				if(LOG.isDebugEnabled()) LOG.debug("Split information retrieved, path is: " + splPath.toString());
+				return true;
+                        }
+		} catch (Exception e) {
+			if(LOG.isErrorEnabled()) LOG.error("Keeper exception: " + e);
+      return false;
+		} 
+        }
+    
+	public void setSplit(HRegion leftRegion, HRegion rightRegion) throws IOException {
+		String zLeftKey =  zSplitBalPath + leftRegion.getRegionInfo().getEncodedName();
+		String zRightKey = zSplitBalPath + rightRegion.getRegionInfo().getEncodedName();
+		
+		try {
+			if(ZKUtil.checkExists(zkw, balancePath.substring(0,balancePath.length()-1)) != -1) {
+				clearBalance();
+			}
+			if (ZKUtil.checkExists(zkw, zLeftKey) == -1) 
+				ZKUtil.createWithParents(zkw, zLeftKey);
+			if (ZKUtil.checkExists(zkw, zRightKey) == -1) 
+				ZKUtil.createWithParents(zkw, zRightKey);
+			
+			ZKUtil.createAndFailSilent(zkw, zLeftKey + "/" + SPLIT, Bytes.toBytes(flushPath.toString()));
+			ZKUtil.createAndFailSilent(zkw, zRightKey + "/" + SPLIT, Bytes.toBytes(flushPath.toString()));
+			if(LOG.isDebugEnabled()) LOG.debug("Split coordination node written for " + leftRegion.getRegionNameAsString() +
+					                           " and " + rightRegion.getRegionNameAsString());
+			
+		} catch (KeeperException ke) {
+			LOG.error("ERROR: Zookeeper exception: "+ ke);
+		}
+	}
+	
+	public void setSplit() {
+	   
+    try {
+      if (ZKUtil.checkExists(zkw, balancePath.substring(0, balancePath.length()-1)) != -1) {
+        clearBalance();
+      }
+      if (ZKUtil.checkExists(zkw, splitPath.substring(0, splitPath.length()-1)) == -1) {
+        ZKUtil.createWithParents(zkw, splitPath.substring(0, splitPath.length()-1));
+      }
+
+      ZKUtil.createSetData(zkw, splitPath.substring(0, splitPath.length()-1), Bytes.toBytes(flushPath.toString()));
+      if(LOG.isDebugEnabled()) LOG.debug("Setting split coordination node for " + hri.getRegionNameAsString());
+      
+    } catch (KeeperException ke) {
+      LOG.error("ERROR: Zookeeper exception: "+ ke);
+    }
+	}
+	
+	public void clearSplit() {
+		if(LOG.isTraceEnabled()) LOG.trace("clearSplit called for region: " + this.hri.getRegionNameAsString());
+		try {
+			ZKUtil.deleteNodeRecursively(zkw, regionPath);
+		} catch (KeeperException ke) {
+			LOG.error("Zookeeper exception: " + ke);
+		}
+	}
+	
+	public boolean getBalance(StringBuilder path) {
+		try {
+		
+			byte [] balPath = ZKUtil.getData(zkw, balancePath.substring(0, balancePath.length()-1));
+			if (balPath == null)
+				return false;
+			else{
+				path.append(new String(balPath));
+				if(LOG.isDebugEnabled()) LOG.debug("Balance information retrieved, path is: " + new String(balPath));
+				return true;
+			}
+    } catch (Exception e) {
+			if(LOG.isErrorEnabled()) LOG.error("Keeper exception: " + e);
+    }
+		return true;
+	}
+	
+	public void setBalance() throws IOException {
+		
+		try {
+			if (ZKUtil.checkExists(zkw, splitPath.substring(0, splitPath.length()-1)) != -1) {
+				throw new IOException("SPLIT node already exists when trying to add BALANCE node");
+			}
+
+			if (ZKUtil.checkExists(zkw, balancePath.substring(0, balancePath.length()-1)) == -1) {
+				ZKUtil.createWithParents(zkw, balancePath.substring(0, balancePath.length()-1));
+			}
+			ZKUtil.createSetData(zkw, balancePath.substring(0, balancePath.length()-1), Bytes.toBytes(flushPath.toString()));
+			if(LOG.isDebugEnabled()) LOG.debug("Setting balance coordination node for " + hri.getRegionNameAsString());
+			
+		} catch (KeeperException ke) {
+			LOG.error("ERROR: Zookeeper exception: "+ ke);
+		}
+	}
+	
+	public void clearBalance() {
+		if(LOG.isTraceEnabled()) LOG.trace("clearBalance called for region: " + this.hri.getRegionNameAsString());
+		try {
+			ZKUtil.deleteNodeRecursively(zkw, regionPath);
+		} catch (KeeperException ke) {
+			LOG.error("Zookeeper exception: " + ke);
+		}
+	}
+	
+	private long getTimeStamp() {
+		return System.currentTimeMillis();
+	}
+	
+	protected boolean pendingListClear(Set<TrxTransactionState> commitPendingTransactions) throws IOException {
+	  return commitPendingTransactions.isEmpty();
+	}
+	
+	 protected boolean scannersListClear(ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scanners) throws IOException {
+	    return scanners.isEmpty();
+	  }
+	
+  protected void pendingWait(Set<TrxTransactionState> commitPendingTransactions, int pendingDelayLen) throws IOException {
+      int count = 1;
+      while(!pendingListClear(commitPendingTransactions)) {
+          try {
+              if(LOG.isDebugEnabled()) LOG.debug("pendingWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+              Thread.sleep(pendingDelayLen);
+          } catch(InterruptedException e) {
+              String error = "Problem while calling sleep() on pendingWait delay, " + e;
+              if(LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on preSplit delay, returning. " + e);
+              throw new IOException(error);
+          }
+      }
+  }
+    
+  protected void scannersWait(ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scanners, int pendingDelayLen) 
+      throws IOException {
+    int count = 1;
+    while(!scannersListClear(scanners)) {
+        try {
+            if(LOG.isDebugEnabled()) LOG.debug("scannersWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+            Thread.sleep(pendingDelayLen);
+        } catch(InterruptedException e) {
+            String error = "Problem while calling sleep() on scannersWait delay, " + e;
+            if(LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on preSplit delay, returning. " + e);
+            throw new IOException(error);
+        }
+    }
+  }
+  
+  protected void pendingAndScannersWait(Set<TrxTransactionState> commitPendingTransactions, 
+                                        ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scanners,
+                                        int pendingDelayLen) throws IOException {
+    int count = 1;
+    while(!scannersListClear(scanners) && !pendingListClear(commitPendingTransactions)) {
+      try {
+          if(LOG.isDebugEnabled()) LOG.debug("pendingAndScannersWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+          Thread.sleep(pendingDelayLen);
+      } catch(InterruptedException e) {
+          String error = "Problem while calling sleep() on pendingAndScannersWait delay, " + e;
+          if(LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on pendingAndScannersWait delay, returning. " + e);
+          throw new IOException(error);
+      }
+    }
+  }
+
+  protected void activeWait(ConcurrentHashMap<String, TrxTransactionState> transactionsById, int activeDelayLen, int splitDelayLimit) throws IOException {
+      int counter = 0;
+      int minutes = 0;
+      int currentMin = 0;
+
+      boolean delayMsg = false;
+      while(!transactionsById.isEmpty()) {
+          try {
+              delayMsg = true;
+              Thread.sleep(activeDelayLen);
+              counter++;
+              currentMin = (counter * activeDelayLen) / 60000;
+
+              if(currentMin > minutes) {
+                  minutes = currentMin;
+                  if (LOG.isInfoEnabled()) LOG.info("Delaying split due to transactions present. Delayed : " + 
+                                                    minutes + " minute(s) on " + hri.getRegionNameAsString());
+              }
+              if(minutes >= splitDelayLimit) {
+                  if(LOG.isWarnEnabled()) LOG.warn("Surpassed split delay limit of " + splitDelayLimit
+                                                  + " minutes. Continuing with split");
+                  delayMsg = false;
+                  break;
+              }
+          } catch (InterruptedException e) {
+              String error = "Problem while calling sleep() on preSplit delay - activeWait: " + e;
+              if(LOG.isErrorEnabled()) LOG.error(error);
+              throw new IOException(error);
+          }
+      }
+      if(delayMsg) {
+        if(LOG.isWarnEnabled()) LOG.warn("Continuing with split operation, no active transactions on: " + hri.getRegionNameAsString());
+      }
+  }
+  
+  protected void zkCleanup() {
+      if(LOG.isTraceEnabled()) LOG.trace("zkCleanup -- ENTRY");
+      try {
+          List<String> trafTables = ZKUtil.listChildrenNoWatch(zkw, zSplitBalPathNoSlash);
+          List<String> hbaseTables = ZKUtil.listChildrenNoWatch(zkw, SplitBalanceHelper.zkTable);
+          
+          for(String tableName : trafTables) {
+              if(!hbaseTables.contains(tableName)) {
+                  if(LOG.isTraceEnabled()) LOG.trace("zkCleanup, removing " + zSplitBalPath + tableName);
+                  ZKUtil.deleteNodeRecursively(zkw, zSplitBalPath + tableName);
+              }
+          }
+      } catch(KeeperException ke) {
+          if(LOG.isErrorEnabled()) LOG.error("zkCleanup error, please check your ZooKeeper: " + ke);
+      }
+      if(LOG.isTraceEnabled()) LOG.trace("zkCleanup -- EXIT");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
index e426159..2e64c12 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
@@ -91,6 +91,18 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.codec.binary.Hex;
 
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ProtoUtil;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionPersist;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionStateMsg;
+import com.google.protobuf.CodedInputStream;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -136,10 +148,12 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -268,7 +282,7 @@ CoprocessorService, Coprocessor {
 
   // Concurrent map for transactional region scanner holders
   // Protected by synchronized methods
-  final ConcurrentHashMap<Long,
+  private ConcurrentHashMap<Long,
                           TransactionalRegionScannerHolder> scanners =
       new ConcurrentHashMap<Long, TransactionalRegionScannerHolder>();
 
@@ -341,6 +355,9 @@ CoprocessorService, Coprocessor {
   String lv_hostName;
   int lv_port;
   private static String zNodePath = "/hbase/Trafodion/recovery/";
+  private static final String COMMITTED_TXNS_KEY = "1_COMMITED_TXNS_KEY";
+  private static final String TXNS_BY_ID_KEY = "2_TXNS_BY_ID_KEY";
+  private HFileContext context = new HFileContextBuilder().withIncludesTags(false).build();
 
   private static final int MINIMUM_LEASE_TIME = 7200 * 1000;
   private static final int LEASE_CHECK_FREQUENCY = 1000;
@@ -367,12 +384,14 @@ CoprocessorService, Coprocessor {
   private static float memoryPercentage = 0;
   private static boolean memoryThrottle = false;
   private static boolean suppressOutOfOrderProtocolException = DEFAULT_SUPPRESS_OOP;
+  private Configuration config;
 
   // Transaction state defines
   private static final int COMMIT_OK = 1;
   private static final int COMMIT_OK_READ_ONLY = 2;
   private static final int COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR = 3;
   private static final int COMMIT_CONFLICT = 5;
+  private static final int COMMIT_RESEND = 6;
 
   private static final int CLOSE_WAIT_ON_COMMIT_PENDING = 1000;
   private static final int MAX_COMMIT_PENDING_WAITS = 10;
@@ -392,7 +411,7 @@ CoprocessorService, Coprocessor {
 
   public static final String trxkeyEPCPinstance = "EPCPinstance";
   // TBD Maybe we should just use HashMap to improve the performance, ConcurrentHashMap could be too strict
-  static ConcurrentHashMap<String, Object> transactionsEPCPMap = new ConcurrentHashMap<String, Object>();
+  static ConcurrentHashMap<String, Object> transactionsEPCPMap;
 
   // TrxRegionService methods
     
@@ -839,9 +858,11 @@ CoprocessorService, Coprocessor {
       } catch (UnknownTransactionException u) {
         if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString());
         ute = u;
+        status = COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
       } catch (IOException e) {
         if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught IOException after internal commitRequest call - "+ e.toString());
         ioe = e;
+        status = COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
       }
     }
 
@@ -2851,6 +2872,13 @@ CoprocessorService, Coprocessor {
     return this;
   }
 
+  static public ConcurrentHashMap<String, Object> getRegionMap() {
+    if (transactionsEPCPMap == null) {
+      transactionsEPCPMap = new ConcurrentHashMap<String, Object>();
+    }
+    return transactionsEPCPMap;
+  }
+
   /**
    * Stores a reference to the coprocessor environment provided by the
    * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} 
@@ -2864,6 +2892,9 @@ CoprocessorService, Coprocessor {
    */
   @Override
   public void start(CoprocessorEnvironment env) throws IOException {
+    if (transactionsEPCPMap == null)
+      transactionsEPCPMap = new ConcurrentHashMap<String, Object>();
+
     if (env instanceof RegionCoprocessorEnvironment) {
       this.env = (RegionCoprocessorEnvironment)env;
     } else {
@@ -2878,30 +2909,30 @@ CoprocessorService, Coprocessor {
     this.t_Region = (TransactionalRegion) tmp_env.getRegion();
     this.fs = this.m_Region.getFilesystem();
 
-    org.apache.hadoop.conf.Configuration conf = tmp_env.getConfiguration(); 
+    this.config = tmp_env.getConfiguration();
     
     synchronized (stoppableLock) {
       try {
-        this.transactionLeaseTimeout = conf.getInt(LEASE_CONF, MINIMUM_LEASE_TIME);
+        this.transactionLeaseTimeout = config.getInt(LEASE_CONF, MINIMUM_LEASE_TIME);
         if (this.transactionLeaseTimeout < MINIMUM_LEASE_TIME) {
           if (LOG.isWarnEnabled()) LOG.warn("Transaction lease time: " + this.transactionLeaseTimeout + ", was less than the minimum lease time.  Now setting the timeout to the minimum default value: " + MINIMUM_LEASE_TIME);
           this.transactionLeaseTimeout = MINIMUM_LEASE_TIME;
         }
 
-        this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(conf,
+        this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(config,
           HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
           HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
           HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
-        this.scannerThreadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+        this.scannerThreadWakeFrequency = config.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
 
-        this.cleanTimer = conf.getInt(SLEEP_CONF, DEFAULT_SLEEP);
-        this.memoryUsageThreshold = conf.getInt(MEMORY_THRESHOLD, DEFAULT_MEMORY_THRESHOLD);
-        this.memoryUsagePerformGC = conf.getBoolean(MEMORY_PERFORM_GC, DEFAULT_MEMORY_PERFORM_GC);
-        this.memoryUsageWarnOnly = conf.getBoolean(MEMORY_WARN_ONLY, DEFAULT_MEMORY_WARN_ONLY);
-        this.memoryUsageTimer = conf.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
-        this.memoryUsageTimer = conf.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
+        this.cleanTimer = config.getInt(SLEEP_CONF, DEFAULT_SLEEP);
+        this.memoryUsageThreshold = config.getInt(MEMORY_THRESHOLD, DEFAULT_MEMORY_THRESHOLD);
+        this.memoryUsagePerformGC = config.getBoolean(MEMORY_PERFORM_GC, DEFAULT_MEMORY_PERFORM_GC);
+        this.memoryUsageWarnOnly = config.getBoolean(MEMORY_WARN_ONLY, DEFAULT_MEMORY_WARN_ONLY);
+        this.memoryUsageTimer = config.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
+        this.memoryUsageTimer = config.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
 
-        this.suppressOutOfOrderProtocolException = conf.getBoolean(SUPPRESS_OOP, DEFAULT_SUPPRESS_OOP);
+        this.suppressOutOfOrderProtocolException = config.getBoolean(SUPPRESS_OOP, DEFAULT_SUPPRESS_OOP);
 	if (this.transactionLeases == null)  
 	    this.transactionLeases = new Leases(LEASE_CHECK_FREQUENCY);
 
@@ -3076,6 +3107,16 @@ CoprocessorService, Coprocessor {
         transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyClosingVar,
                                   this.closing);
     }
+    ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck =
+        (ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsByIdTestz
+        .get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyScanners);
+    if(scannersCheck != null) {
+      this.scanners = scannersCheck;
+    }
+    else {
+      transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyScanners,
+                              this.scanners);
+    }
 
     // Set up the memoryBean from the ManagementFactory
     if (memoryUsageThreshold < DEFAULT_MEMORY_THRESHOLD) 
@@ -4059,7 +4100,6 @@ CoprocessorService, Coprocessor {
      throws IOException {
 
     if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: beginTransaction -- ENTRY txId: " + transactionId);
-    checkClosing(transactionId);
 
     // TBD until integration with recovery 
     if (reconstructIndoubts == 0) {
@@ -4174,6 +4214,7 @@ CoprocessorService, Coprocessor {
     if (LOG.isTraceEnabled()) LOG.trace("Enter TrxRegionEndpoint coprocessor: beginTransIfNotExist, txid: "
               + transactionId + " transactionsById size: "
               + transactionsById.size());
+    checkClosing(transactionId);
 
     String key = getTransactionalUniqueId(transactionId);
     synchronized (transactionsById) {
@@ -4303,8 +4344,10 @@ CoprocessorService, Coprocessor {
     }
       // may change to indicate a NOTFOUND case  then depends on the TM ts state, if reinstated tx, ignore the exception
       if (state == null) {
-        if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest encountered unknown transactionID txId: " + transactionId + " returning COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR");
-        return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
+        String errMsg = "TrxRegionEndpoint coprocessor: commitRequest encountered unknown transactionID txId: " + transactionId;
+        if (LOG.isTraceEnabled()) LOG.trace(errMsg);
+        //return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
+        throw new UnknownTransactionException(errMsg);
       }
 
     if (LOG.isInfoEnabled()) 
@@ -4503,6 +4546,8 @@ CoprocessorService, Coprocessor {
 
     // Otherwise we were read-only and commitable, so we can forget it.
     state.setStatus(Status.COMMITED);
+    if(state.getSplitRetry())
+      return COMMIT_RESEND;
     retireTransaction(state, true);
     if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest READ ONLY -- EXIT txId: " + transactionId);
     return COMMIT_OK_READ_ONLY;
@@ -4673,7 +4718,7 @@ CoprocessorService, Coprocessor {
          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: " + transactionId + " COMMIT_OK");
          return true;
        } catch (Throwable e) {
-         LOG.error("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", Caught exception after internal commit call "
+        if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", Caught exception after internal commit call "
                     + e.getMessage() + " " + stackTraceToString(e));
         throw new IOException(e.toString());
        }
@@ -5178,6 +5223,199 @@ CoprocessorService, Coprocessor {
       }
     }
   }
+  public void flushToFS(Path flushPath) throws IOException {
+    TransactionPersist.Builder txnPersistBuilder = TransactionPersist.newBuilder();
+    fs.delete(flushPath, true);
+
+    HFileWriterV2 w =
+        (HFileWriterV2)
+        HFile.getWriterFactory(config, new CacheConfig(config))
+        .withPath(fs, flushPath).withFileContext(context).create();
+
+    Map<Long, TrxTransactionState> transactionMap = new HashMap<Long, TrxTransactionState>();
+
+    for(TrxTransactionState ts : transactionsById.values()) {
+      transactionMap.put(ts.getTransactionId(), ts);
+      txnPersistBuilder.addTxById(ts.getTransactionId());
+    }
+    for(Map.Entry<Long, TrxTransactionState> entry :
+        commitedTransactionsBySequenceNumber.entrySet()) {
+      transactionMap.put(entry.getValue().getTransactionId(), entry.getValue());
+      txnPersistBuilder.addSeqNoListSeq(entry.getKey());
+      txnPersistBuilder.addSeqNoListTxn(entry.getValue().getTransactionId());
+    }
+    for(TrxTransactionState ts : transactionMap.values()) {
+      for(TrxTransactionState ts2 : ts.getTransactionsToCheck()) {
+        transactionMap.put(ts.getTransactionId(), ts);
+      }
+    }
+    txnPersistBuilder.setNextSeqId(nextSequenceId.get());
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+    for(TrxTransactionState ts : transactionMap.values()) {
+      TransactionStateMsg.Builder tsBuilder =  TransactionStateMsg.newBuilder();
+      tsBuilder.setTxId(ts.getTransactionId());
+      tsBuilder.setStartSeqNum(ts.getStartSequenceNumber());
+      tsBuilder.setSeqNum(ts.getHLogStartSequenceId());
+      tsBuilder.setLogSeqId(ts.getLogSeqId());
+      tsBuilder.setReinstated(ts.isReinstated());
+
+      if(ts.getCommitProgress() == null)
+          tsBuilder.setCommitProgress(-1);
+      else
+          tsBuilder.setCommitProgress(ts.getCommitProgress().ordinal());
+
+      tsBuilder.setStatus(ts.getStatus().ordinal());
+      for (WriteAction wa : ts.getWriteOrdering()) {
+        if(wa.getPut() != null) {
+          tsBuilder.addPutOrDel(true);
+          tsBuilder.addPut(ProtobufUtil.toMutation(MutationType.PUT, wa.getPut()));
+        }
+        else {
+          tsBuilder.addPutOrDel(false);
+
+          tsBuilder.addDelete(ProtobufUtil.toMutation(MutationType.DELETE, wa.getDelete()));
+        }
+      }
+      tsBuilder.build().writeDelimitedTo(output);
+    }
+    byte [] firstByte = output.toByteArray();
+
+    w.append(new KeyValue(Bytes.toBytes(COMMITTED_TXNS_KEY), Bytes.toBytes("cf"), Bytes.toBytes("qual"),
+      firstByte));
+
+    byte [] persistByte = txnPersistBuilder.build().toByteArray();
+    TransactionPersist persistMsg = TransactionPersist.parseFrom(persistByte);
+    w.append(new KeyValue(Bytes.toBytes(TXNS_BY_ID_KEY), Bytes.toBytes("cf"), Bytes.toBytes("qual"),
+      persistByte));
+    w.close();
+  }
+
+  public void readTxnInfo(Path flushPath) throws IOException {
+    readTxnInfo(flushPath, false);
+  }
 
+  public void readTxnInfo(Path flushPath, boolean setRetry) throws IOException {
+          if(LOG.isTraceEnabled()) LOG.trace("readTxnInfo -- ENTRY, Path: " + flushPath.toString());
+
+          try {
+              HFile.Reader reader = HFile.createReader(fs, flushPath, new CacheConfig(config), config);
+              HFileScanner scanner = reader.getScanner(true, false);
+              scanner.seekTo();
+              //KeyValue firstVal = scanner.getKeyValue();
+              Cell firstVal = scanner.getKeyValue();
+              scanner.next();
+              //KeyValue persistKV = scanner.getKeyValue();
+              Cell persistKV = scanner.getKeyValue();
+
+              if(firstVal == null || persistKV == null) {
+                throw new IOException("Invalid values read from HFile in readTxnInfo");
+              }
+
+              Map<Long, TrxTransactionState> txnMap = new HashMap<Long, TrxTransactionState>();
+              Map<Long, List<Long>> txnsToCheckMap = new HashMap<Long, List<Long>>();
+              ByteArrayInputStream input = new ByteArrayInputStream(CellUtil.cloneValue(firstVal));
+
+              TransactionStateMsg tsm  = TransactionStateMsg.parseDelimitedFrom(input);
+              while (tsm != null) {
+                TrxTransactionState ts = new TrxTransactionState(tsm.getTxId(),
+                                                                 tsm.getSeqNum(),
+                                                                 new AtomicLong(tsm.getLogSeqId()),
+                                                                 m_Region.getRegionInfo(),
+                                                                 m_Region.getTableDesc(),
+                                                                 m_Region.getLog(),
+                                                                 configuredEarlyLogging);
+                ts.setStartSequenceNumber(tsm.getStartSeqNum());
+
+                List<Boolean> putOrDel = tsm.getPutOrDelList();
+                List<MutationProto> puts = tsm.getPutList();
+                List<MutationProto> deletes = tsm.getDeleteList();
+
+                int putIndex = 0;
+                int deleteIndex = 0;
+                for (Boolean put : putOrDel) {
+                  if(put) {
+                    Put writePut = ProtobufUtil.toPut(puts.get(putIndex++));
+                    if(m_Region.rowIsInRange(regionInfo, writePut.getRow())) {
+                        ts.addWrite(writePut);
+                    }
+                  }
+                  else {
+                    Delete writeDelete = ProtobufUtil.toDelete(deletes.get(deleteIndex++));
+                    if(m_Region.rowIsInRange(regionInfo, writeDelete.getRow())) {
+                        ts.addDelete(writeDelete);
+                    }
+                  }
+                }
+                txnsToCheckMap.put(tsm.getTxId(), tsm.getTxnsToCheckList());
+                if(setRetry)
+                  ts.setSplitRetry(true);
+                txnMap.put(ts.getTransactionId(), ts);
+                tsm  = TransactionStateMsg.parseDelimitedFrom(input);
+              }
+
+              for(TrxTransactionState ts : txnMap.values()) {
+                for (Long txid : txnsToCheckMap.get(ts.getTransactionId())) {
+                  TrxTransactionState mapTS = txnMap.get(txid);
+                  if(mapTS != null)
+                    ts.addTransactionToCheck(mapTS);
+                }
+              }
+              TransactionPersist txnPersistMsg = TransactionPersist.parseFrom(CellUtil.cloneValue(persistKV));
+
+              if(txnPersistMsg == null) {
+                throw new IOException("Invalid protobuf, message is null.");
+              }
+              for (Long txid : txnPersistMsg.getTxByIdList()) {
+                String key = getTransactionalUniqueId(txid);
+                TrxTransactionState ts = txnMap.get(txid);
+                if (ts != null) {
+                  TrxTransactionState existingTs = transactionsById.get(txid);
+                  if(existingTs != null) {
+                    for(WriteAction wa : existingTs.getWriteOrdering()) {
+                      if(wa.getPut() != null) {
+                        ts.addWrite(wa.getPut());
+                      }
+                      else {
+                        ts.addDelete(wa.getDelete());
+                      }
+                    }
+                  }
+                  transactionsById.put(key, ts);
+                  transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(txid));
+                }
+                else {
+                  TrxTransactionState tsEntry = new TrxTransactionState(txid,
+              0,
+              new AtomicLong(0),
+              m_Region.getRegionInfo(),
+              m_Region.getTableDesc(),
+              m_Region.getLog(),
+              configuredEarlyLogging);
+                  transactionsById.putIfAbsent(key, tsEntry);
+                }
+              }
+
+              for (int i = 0; i < txnPersistMsg.getSeqNoListSeqCount(); i++) {
+                TrxTransactionState ts = txnMap.get(txnPersistMsg.getSeqNoListTxn(i));
+                if (ts!=null)
+                  commitedTransactionsBySequenceNumber.put(txnPersistMsg.getSeqNoListSeq(i), ts);
+              }
+
+              this.nextSequenceId = new AtomicLong(txnPersistMsg.getNextSeqId());
+            } catch(IOException e) {
+                StringWriter sw = new StringWriter();
+                PrintWriter pw = new PrintWriter(sw);
+                e.printStackTrace(pw);
+                LOG.error(sw.toString());
+            }
+          if(LOG.isTraceEnabled()) LOG.trace("readTxnInfo -- EXIT");
+
+  }
+
+  public void setClosing(boolean value) {
+    closing.set(value);
+  }
 }
 //1}