You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2017/03/22 01:55:45 UTC

[1/3] incubator-trafodion git commit: [TRAFODION-2468] DDL HA , step 2 changes(please refer jira).

Repository: incubator-trafodion
Updated Branches:
  refs/heads/master 3c1c4ad4c -> f7e27c5d1


[TRAFODION-2468] DDL HA , step 2 changes(please refer jira).


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/d133d621
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/d133d621
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/d133d621

Branch: refs/heads/master
Commit: d133d6211b5c93bd3b79fee28db7d68b42df05ab
Parents: b5e7300
Author: Prashant Vasudev <pr...@esgyn.com>
Authored: Tue Mar 7 00:00:38 2017 +0000
Committer: Prashant Vasudev <pr...@esgyn.com>
Committed: Tue Mar 7 00:00:38 2017 +0000

----------------------------------------------------------------------
 .../hbase/client/transactional/TmDDL.java       |  33 +-
 .../java/org/trafodion/dtm/HBaseTxClient.java   | 485 +++++++++++++------
 2 files changed, 365 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/d133d621/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
index 83eb945..af4c2e5 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
@@ -50,7 +50,8 @@ import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 
 public class TmDDL {
 
@@ -300,4 +301,34 @@ public class TmDDL {
             Delete d = new Delete(Bytes.toBytes(lvTransid));
             table.delete(d);
     }
+    
+    public List<Long> getTxIdList(short tmID) throws IOException {
+      byte [] value = null;
+      Scan s = new Scan();
+      s.setCaching(100);
+      s.setCacheBlocks(false);
+      ArrayList<Long> txIdList = new ArrayList<Long>();
+      ResultScanner ss = table.getScanner(s);
+      try{
+        for (Result r : ss) {
+        Long txid = Bytes.toLong(r.getRow());
+        if(TransactionState.getNodeId(txid) != tmID)
+        {
+          //not owned by this tmID
+          continue;
+        }
+        value = r.getValue(TDDL_FAMILY, TDDL_STATE);
+        if((value != null) && (value.length > 0) && 
+           (Bytes.toString(value).equals("VALID") ||
+            Bytes.toString(value).equals("REDRIVE")))
+          txIdList.add(txid);
+        }
+      }
+      finally {
+          ss.close();
+      }
+      
+      return txIdList;
+    }
+    
  }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/d133d621/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index dd6bffe..cef53b1 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -911,6 +911,98 @@ public class HBaseTxClient {
      /**
       * Thread to gather recovery information for regions that need to be recovered 
       */
+   /*
+    * DDL specific recovery operation use cases are as follows:
+    * In case of phase 1 prepare , phase 2 commit, DDL operations are
+    * completed once the region operations are complete. However incase of TM
+    * going down and restarting, some of the state information is lost.
+    * 1. If the DDL operation also involves DML operations, regions that have
+    * in doubt transaction request for help. Recovery thread here reconstructs 
+    * the TS state and also DDL operation state from TMDDL and redrives the operation.
+    * 
+    * 2. If all the regions have completed their operations and if only DDL operation
+    * is pending and TM goes down and restarted, there are no regions that 
+    * would seek help to redrive the operations. if there is pending DDL operation
+    * it will be left starving as there are no triggers to redrive the operation. 
+    * To handle this case, every time TM starts, as part of recovery thread start
+    * a general scan of TMDDL is made to check for owning transIDs and those that 
+    * have active DDL is checked against state of transaction and appropriately redriven.
+    * 
+    * 3. Failure of TM and restart of TM can happen at any state of DDL operation 
+    * in progress and before that operation is recorded as complete. One way to 
+    * accurately keep note of this operation in progress is to record the operation 
+    * before and after the operation. For this, the table against which the operation
+    * is being performed would be the key in a new log, we choose another table
+    * called TmDDLObject table. This acts as a global semaphore for DDL table 
+    * operation. Recovery Thread as part of its startup processing always checks 
+    * against TmDDLObject table and if it owns the transaction, continues to
+    * recover the DDL operation. 
+    *
+    */
+    /* DDL Use cases and detailed info :
+
+    Use case                Tx result     phase 1       phase 2
+    ----------              ----------    -------     ------------
+    Create Table            commit        no op         no op 
+    Create Table            rollback      no op         disable ,delete             
+    
+    create table ,insert    commit        no op         no op
+    create table, insert    rollback      no op         disable, delete    Notes:need flag for preclose, make this flag persistent in tmddl and region
+    
+    
+    Drop Table              commit        disable       delete                       
+    Drop Table              rollback      no op         no op                        
+    
+    Insert, drop table      commit        disable       delete             Notes: flag for preclose,   make this flag persistent in tmddl and region.
+    Insert, drop table      rollback      no op         no op
+    
+    
+    0. Commit thread and recovery thread do that same thing, either commit or abort.
+    
+    1. Commit thread is aware of other regions. Recovery thread is not aware of other regions.
+       Commit thread doing commit or abort will perform end point calls to all regions involved followed by DDL. 
+       Recovery thread doing commit or abort will perform DDL operation separate and region operation separate.
+       Question is can DDL and DML commit processing be done in parallel or decoupled. Looks like its ok, MD dml 
+       and actual DDL have same decision either to commit or abort.  
+       One concern is visibility of the table based on corresponding MD changes being recovered( commit or abort)
+       at different times. 
+       
+       With create table scenario, unwinding DDL and MD at different times does not matter since
+       creation of new table from a new thread will get errors from Hmaster since table still exists in hmaster.
+       One caviot is TM thinking table is created, but before creating TM dies. When recovery thread attempts to recover abort,
+       it goes ahead and deletes the table( which some other thread already succeded in creating the table) which is very dangerous. 
+       This scenario is handled as part of step 3 below.   
+       
+       With drop table scenario, unwinding drop DDL and MD at slightly different times is acceptable since:
+       
+       In general, whether recovery thread or commit thread is driving the commit or abort, visibility of the table
+       is the same as originally intended. 
+       
+       Having DDL table and DML in the same table, having insertsDrop and createInserts flag persistent is needed for 
+       redrive of commit or abort.
+      
+      
+    2. Commit thread and recovery thread cannot overlap or do same execution in parallel. 
+       Transaction status in Tlog must be the sole decision maker for recovery thread to redrive. If recovery thread
+       finds a active transaction state object in memory, it assumes commit thread is handling it.
+    
+       
+    3. Failure of TM and restart of TM can happen at any state of DDL operation in progress and before that operation is recorded as complete. 
+       
+       In case of create table,  TM dies before create table,  tx is now aborted.  Visibility of table is "Does not exist". However
+       the recovery thread should not attempt to delete the table if it really did not create it ( some other thread might have created it in parallel).
+       This is where we need a global semaphore on table name.
+       
+       In case of create table,  TM dies after create table,  tx is now aborted. Visibility of table is "Does not exist". recovery thread
+       attempting to delete the table is ok here.
+       
+       In case of drop table, the visibility of the table at Hmaster prevents other threads from creating a duplicate table. There is scenario of
+       accidently dropping a duplicate table.
+    
+       In general, a global semaphore kind of method helps to reduce the complexity and recover from various stages of failures.  
+    
+    */
+   
      private static class RecoveryThread extends Thread{
              static final int SLEEP_DELAY = 1000; // Initially set to run every 1sec
              private int sleepTimeInt = 0;
@@ -928,6 +1020,7 @@ public class HBaseTxClient {
              private boolean useTlog;
              HBaseTxClient hbtx;
              private static int envSleepTimeInt;
+             private boolean ddlOnlyRecoveryCheck = true;
 
          static {
             String sleepTime = System.getenv("TMRECOV_SLEEP");
@@ -997,16 +1090,36 @@ public class HBaseTxClient {
                  ts.addRegion(loc);
              }
 
-            @Override
-             public void run() {
-
+    @Override
+    public void run() {
+        
+        //Start of recovery thread could be due to only two scenarios.
+        //1. TM coming up as part of node up. In this case recovery thread 
+        //   assigned node id is the same as own node.
+        //2. As part of TM going down, LDTM starting a new recovery thread
+        //   corresponding to the node id of TM that just went down. In this
+        //   case, recovery thread node id is that of remote node that went down.
+        //
+        //In both scenarios, there may be DDL only recovery or DDL/DML region
+        //recovery. DDL/DML region recovery is driven from the affected regions.
+        //In the below loop, first DDL only recovery is checked first. This check
+        //is performed only once in the beginning. Rest of the recovery is
+        //is followed by DDL DML recovery check .
+      
                 while (this.continueThread) {
                     try {
                         skipSleep = false;
                         Map<String, byte[]> regions = null;
-                        Map<Long, TransactionState> transactionStates =
-                                new HashMap<Long, TransactionState>();
+                        Map<Long, TransactionState> transactionStates = null;
                         boolean loopBack = false;
+                        if(ddlOnlyRecoveryCheck)
+                        {
+                          transactionStates = getTransactionsFromTmDDL();
+                          if(transactionStates != null)
+                            recoverTransactions(transactionStates);
+                          ddlOnlyRecoveryCheck = false;
+                        }
+                        
                         do {
                            try {
                                loopBack = false;
@@ -1022,152 +1135,10 @@ public class HBaseTxClient {
                             skipSleep = true;
                             recoveryIterations++;
 
-                            if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt region size " + regions.size());
-                            for (Map.Entry<String, byte[]> regionEntry : regions.entrySet()) {
-                                List<Long> TxRecoverList = new ArrayList<Long>();
-                                String hostnamePort = regionEntry.getKey();
-                                byte[] regionBytes = regionEntry.getValue();
-                                if (LOG.isDebugEnabled())
-                                    LOG.debug("TRAF RCOV THREAD:Recovery Thread Processing region: " + new String(regionBytes));
-                                if (recoveryIterations == 0) {
-                                   if(LOG.isWarnEnabled()) {
-                                      //  Let's get the host name
-                                      final byte [] delimiter = ",".getBytes();
-                                      String[] hostname = hostnamePort.split(new String(delimiter), 3);
-                                      if (hostname.length < 2) {
-                                         throw new IllegalArgumentException("hostnamePort format is incorrect");
-                                      }
-
-                                      LOG.warn ("TRAF RCOV THREAD:Starting recovery with " + regions.size() +
-                                           " regions to recover.  First region hostname: " + hostnamePort +
-                                           " Recovery iterations: " + recoveryIterations);
-                                   }
-                                }
-                                else {
-                                   if(recoveryIterations % 10 == 0) {
-                                      if(LOG.isWarnEnabled()) {
-                                         //  Let's get the host name
-                                         final byte [] delimiter = ",".getBytes();
-                                         String[] hostname = hostnamePort.split(new String(delimiter), 3);
-                                         if (hostname.length < 2) {
-                                            throw new IllegalArgumentException("hostnamePort format is incorrect");
-                                         }
-                                         LOG.warn("TRAF RCOV THREAD:Recovery thread encountered " + regions.size() +
-                                           " regions to recover.  First region hostname: " + hostnamePort +
-                                           " Recovery iterations: " + recoveryIterations);
-                                      }
-                                   }
-                                }
-                                try {
-                                    TxRecoverList = txnManager.recoveryRequest(hostnamePort, regionBytes, tmID);
-                                }
-                                catch (IOException e) {
-                                   // For all cases of Exception, we rely on the region to redrive the request.
-                                   // Likely there is nothing to recover, due to a stale region entry, but it is always safe to redrive.
-                                   // We log a warning event and delete the ZKNode entry.
-                                   LOG.warn("TRAF RCOV THREAD:Exception calling txnManager.recoveryRequest. " + "TM: " +
-                                              tmID + " regionBytes: [" + regionBytes + "].  Deleting zookeeper region entry. \n exception: ", e);
-                                   zookeeper.deleteRegionEntry(regionEntry);
-
-                                   // In the case of NotServingRegionException we will repost the ZKNode after refreshing the table.
-                                   if ((e instanceof NotServingRegionException) || (e.getCause() instanceof NotServingRegionException)){
-                                       // Create a local HTable object using the regionInfo
-                                       HTable table = new HTable(config, HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString());
-                                       // Repost a zookeeper entry for all current regions in the table
-                                       zookeeper.postAllRegionEntries(table);
-                                   }
-                                } // IOException
-
-                                if (TxRecoverList != null) {
-                                    if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size of TxRecoverList " + TxRecoverList.size());
-                                    if (TxRecoverList.size() == 0) {
-                                         // First delete the zookeeper entry
-                                         LOG.warn("TRAF RCOV THREAD:Leftover Znode  calling txnManager.recoveryRequest. " + "TM: " +
-                                                 tmID + " regionBytes: [" + regionBytes + "].  Deleting zookeeper region entry. ");
-                                         zookeeper.deleteRegionEntry(regionEntry);
-                                   }
-                                   for (Long txid : TxRecoverList) {
-                                      TransactionState ts = transactionStates.get(txid);
-                                      if (ts == null) {
-                                         ts = new TransactionState(txid);
-
-                                         //Identify if DDL is part of this transaction and valid
-                                         if(hbtx.useDDLTrans){
-                                            TmDDL tmDDL = hbtx.getTmDDL();
-                                            StringBuilder state = new StringBuilder ();
-                                            tmDDL.getState(txid,state);
-                                            if(state.toString().equals("VALID"))
-                                               ts.setDDLTx(true);
-                                         }
-                                      }
-                                      this.addRegionToTS(hostnamePort, regionBytes, ts);
-                                      transactionStates.put(txid, ts);
-                                   }
-                                }
-                                else if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD:size od TxRecoverList is NULL ");
-                            }
-                            if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt transaction size " + transactionStates.size());
-                            for (Map.Entry<Long, TransactionState> tsEntry : transactionStates.entrySet()) {
-                                int isTransactionStillAlive = 0;
-                                TransactionState ts = tsEntry.getValue();
-                                Long txID = ts.getTransactionId();
-                                // TransactionState ts = new TransactionState(txID);
-                                
-                                //It is possible for long prepare situations that involve multiple DDL
-                                //operations, multiple prompts from RS is received. Hence check to see if there
-                                //is a TS object in main TS list and transaction is still active.
-                                //Note that tsEntry is local TS object. 
-                                if (hbtx.mapTransactionStates.get(txID) != null) {
-                                  if (hbtx.mapTransactionStates.get(txID).getStatus().toString().contains("ACTIVE")) {
-                                    isTransactionStillAlive = 1;
-                                  }
-                                  if (LOG.isInfoEnabled()) 
-                                  LOG.info("TRAF RCOV THREAD: TID " + txID
-                                            + " still has TS object in TM memory. TS details: "
-                                            + hbtx.mapTransactionStates.get(txID).toString() 
-                                            + " transactionAlive: " + isTransactionStillAlive);
-                                  if(isTransactionStillAlive == 1)
-                                    continue; //for loop
-                                }
-                               
-                                try {
-                                    audit.getTransactionState(ts);
-                                    if (ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {
-                                        if (LOG.isDebugEnabled())
-                                            LOG.debug("TRAF RCOV THREAD:Redriving commit for " + txID + " number of regions " + ts.getParticipatingRegions().size() +
-                                                    " and tolerating UnknownTransactionExceptions");
-                                        txnManager.doCommit(ts, true /*ignore UnknownTransactionException*/);
-                                        if(useTlog && useForgotten) {
-                                            long nextAsn = tLog.getNextAuditSeqNum((int)TransactionState.getNodeId(txID));
-                                            tLog.putSingleRecord(txID, ts.getCommitId(), "FORGOTTEN", null, forceForgotten, nextAsn);
-                                        }
-                                    } else if (ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
-                                        if (LOG.isDebugEnabled())
-                                            LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
-                                        txnManager.abort(ts);
-                                    } else {
-                                        if (LOG.isDebugEnabled())
-                                            LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
-                                        LOG.warn("Recovering transaction " + txID + ", status is not set to COMMITTED or ABORTED. Aborting.");
-                                        txnManager.abort(ts);
-                                    }
-
-                                } catch (UnsuccessfulDDLException ddle) {
-                                    LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , ddle);
-
-                                    //Note that there may not be anymore redrive triggers from region server point of view for DDL operation.
-                                    //Register this DDL transaction for subsequent redrive from Audit Control Event.
-                                    //TODO: Launch a new Redrive Thread out of auditControlPoint().
-                                    TmDDL tmDDL = hbtx.getTmDDL();
-                                    tmDDL.setState(txID,"REDRIVE");
-                                    LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
-                                } catch (CommitUnsuccessfulException cue) {
-                                    LOG.error("CommitUnsuccessfulException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , cue);
-                                    TmDDL tmDDL = hbtx.getTmDDL();
-                                    tmDDL.setState(txID,"REDRIVE");
-                                    LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
-                                }
-                            }
+                            transactionStates = getTransactionsFromRegions(regions);
+                            
+                            if(transactionStates != null)
+                                recoverTransactions(transactionStates);
 
                         }
                         else {
@@ -1232,8 +1203,218 @@ public class HBaseTxClient {
                 }
                 if(LOG.isDebugEnabled()) LOG.debug("Exiting recovery thread for tm ID: " + tmID);
             }
-     }
+            
+    private  Map<Long, TransactionState> getTransactionsFromRegions(
+                           Map<String, byte[]> regions)
+                           throws IOException, KeeperException,
+                           DeserializationException
+    {
+        if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt region size " + regions.size());
+        for (Map.Entry<String, byte[]> regionEntry : regions.entrySet()) {
+            Map<Long, TransactionState> transactionStates =
+                            new HashMap<Long, TransactionState>();
+            List<Long> TxRecoverList = new ArrayList<Long>();
+            String hostnamePort = regionEntry.getKey();
+            byte[] regionBytes = regionEntry.getValue();
+            if (LOG.isDebugEnabled())
+                LOG.debug("TRAF RCOV THREAD:Recovery Thread Processing region: " + new String(regionBytes));
+            if (recoveryIterations == 0) {
+               if(LOG.isWarnEnabled()) {
+                  //  Let's get the host name
+                  final byte [] delimiter = ",".getBytes();
+                  String[] hostname = hostnamePort.split(new String(delimiter), 3);
+                  if (hostname.length < 2) {
+                     throw new IllegalArgumentException("hostnamePort format is incorrect");
+                  }
+  
+                  LOG.warn ("TRAF RCOV THREAD:Starting recovery with " + regions.size() +
+                       " regions to recover.  First region hostname: " + hostnamePort +
+                       " Recovery iterations: " + recoveryIterations);
+               }
+            }
+            else {
+               if(recoveryIterations % 10 == 0) {
+                  if(LOG.isWarnEnabled()) {
+                     //  Let's get the host name
+                     final byte [] delimiter = ",".getBytes();
+                     String[] hostname = hostnamePort.split(new String(delimiter), 3);
+                     if (hostname.length < 2) {
+                        throw new IllegalArgumentException("hostnamePort format is incorrect");
+                     }
+                     LOG.warn("TRAF RCOV THREAD:Recovery thread encountered " + regions.size() +
+                       " regions to recover.  First region hostname: " + hostnamePort +
+                       " Recovery iterations: " + recoveryIterations);
+                  }
+               }
+            }
+            try {
+                TxRecoverList = txnManager.recoveryRequest(hostnamePort, regionBytes, tmID);
+            }
+            catch (IOException e) {
+               // For all cases of Exception, we rely on the region to redrive the request.
+               // Likely there is nothing to recover, due to a stale region entry, but it is always safe to redrive.
+               // We log a warning event and delete the ZKNode entry.
+               LOG.warn("TRAF RCOV THREAD:Exception calling txnManager.recoveryRequest. " + "TM: " +
+                          tmID + " regionBytes: [" + regionBytes + "].  Deleting zookeeper region entry. \n exception: ", e);
+               zookeeper.deleteRegionEntry(regionEntry);
+  
+               // In the case of NotServingRegionException we will repost the ZKNode after refreshing the table.
+               if ((e instanceof NotServingRegionException) || (e.getCause() instanceof NotServingRegionException)){
+                   // Create a local HTable object using the regionInfo
+                   HTable table = new HTable(config, HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString());
+                   // Repost a zookeeper entry for all current regions in the table
+                   zookeeper.postAllRegionEntries(table);
+               }
+            } // IOException
+  
+            if (TxRecoverList != null) {
+                if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size of TxRecoverList " + TxRecoverList.size());
+                if (TxRecoverList.size() == 0) {
+                     // First delete the zookeeper entry
+                     LOG.warn("TRAF RCOV THREAD:Leftover Znode  calling txnManager.recoveryRequest. " + "TM: " +
+                             tmID + " regionBytes: [" + regionBytes + "].  Deleting zookeeper region entry. ");
+                     zookeeper.deleteRegionEntry(regionEntry);
+               }
+               for (Long txid : TxRecoverList) {
+                  TransactionState ts = transactionStates.get(txid);
+                  if (ts == null) {
+                     ts = new TransactionState(txid);
+  
+                     //Identify if DDL is part of this transaction and valid
+                     if(hbtx.useDDLTrans){
+                        TmDDL tmDDL = hbtx.getTmDDL();
+                        StringBuilder state = new StringBuilder ();
+                        tmDDL.getState(txid,state);
+                        if(state.toString().equals("VALID"))
+                           ts.setDDLTx(true);
+                     }
+                  }
+                  this.addRegionToTS(hostnamePort, regionBytes, ts);
+                  transactionStates.put(txid, ts);
+               }
+            }
+            else if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD:size od TxRecoverList is NULL ");
+            
+            return transactionStates;
+        }
+        return null;
+    }
+              
+    private  Map<Long, TransactionState> getTransactionsFromTmDDL()
+                                                throws IOException
+    {
+      if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: Checking for DDL only recovery");
+    
+      //Access TMDDL, return null if not enabled.
+      if(! hbtx.useDDLTrans)
+        return null;
+      
+      Map<Long, TransactionState> transactionStates = null;
+      TmDDL tmDDL = hbtx.getTmDDL();
+      List<Long> txIdList  = tmDDL.getTxIdList(tmID);
+      
+      //This list of txID is specific to tmID owner.
+      //This list may include txId that are:
+      //1. currently in ACTIVE state. RecoverTransactions() call takes care of
+      //ignoring TxId which are currently actively in progress.
+      //2. Txids regions which have not yet requested for help(regions requesting help
+      //from zookeeper) , probably will, could be timing. 
+      //3. Txids regions which have already requested for help.
+      //4. Txids whose regions have already serviced, but only require recovery
+      //from DDL perspective.
+      //For 2 and 3 use cases above, those regions will ultimately seek help if
+      //they need help. So no need to handle those regions here. We are only
+      //interested to handle use case 4. If usecase 4 also involves DML regions
+      //it is ok to recover the DDL only here and not dependent on DML regions.
+      //
+      //Note that recoverTransactions() attempts recovery, its a no-op if those
+      //txids are completed for some reason, some of the regions might have completed
+      //processing, ignoreUnknownTransactionException is enabled.
+      if(txIdList != null && txIdList.size() > 0)
+      {
+        transactionStates = new HashMap<Long, TransactionState>();
+        for (Long txid : txIdList)
+        {
+          //build ts object
+          TransactionState  ts = new TransactionState(txid);
+          ts.setDDLTx(true);
+          transactionStates.put(txid, ts);
+        }
+      }
+      return transactionStates;
+    }
+
+    private void recoverTransactions(Map<Long, TransactionState> transactionStates) throws IOException
+    {
+        if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt transaction size " + transactionStates.size());
+        
+        for (Map.Entry<Long, TransactionState> tsEntry : transactionStates.entrySet()) {
+            int isTransactionStillAlive = 0;
+            TransactionState ts = tsEntry.getValue();
+            Long txID = ts.getTransactionId();
+            // TransactionState ts = new TransactionState(txID);
+            
+            //It is possible for long prepare situations that involve multiple DDL
+            //operations, multiple prompts from RS is received. Hence check to see if there
+            //is a TS object in main TS list and transaction is still active.
+            //Note that tsEntry is local TS object. 
+            if (hbtx.mapTransactionStates.get(txID) != null) {
+              if (hbtx.mapTransactionStates.get(txID).getStatus().toString().contains("ACTIVE")) {
+                isTransactionStillAlive = 1;
+              }
+              if (LOG.isInfoEnabled()) 
+              LOG.info("TRAF RCOV THREAD: TID " + txID
+                        + " still has TS object in TM memory. TS details: "
+                        + hbtx.mapTransactionStates.get(txID).toString() 
+                        + " transactionAlive: " + isTransactionStillAlive);
+              if(isTransactionStillAlive == 1)
+                continue; //for loop
+            }
+           
+            try {
+                audit.getTransactionState(ts);
+                if (ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("TRAF RCOV THREAD:Redriving commit for " + txID + " number of regions " + ts.getParticipatingRegions().size() +
+                                " and tolerating UnknownTransactionExceptions");
+                    txnManager.doCommit(ts, true /*ignore UnknownTransactionException*/);
+                    if(useTlog && useForgotten) {
+                        long nextAsn = tLog.getNextAuditSeqNum((int)TransactionState.getNodeId(txID));
+                        tLog.putSingleRecord(txID, ts.getCommitId(), "FORGOTTEN", null, forceForgotten, nextAsn);
+                    }
+                } else if (ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
+                    txnManager.abort(ts);
+                } else {
+                    if (LOG.isDebugEnabled())
+                        LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
+                    LOG.warn("Recovering transaction " + txID + ", status is not set to COMMITTED or ABORTED. Aborting.");
+                    txnManager.abort(ts);
+                }
+  
+            } catch (UnsuccessfulDDLException ddle) {
+                LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , ddle);
+  
+                //Note that there may not be anymore redrive triggers from region server point of view for DDL operation.
+                //Register this DDL transaction for subsequent redrive from Audit Control Event.
+                //TODO: Launch a new Redrive Thread out of auditControlPoint().
+                TmDDL tmDDL = hbtx.getTmDDL();
+                tmDDL.setState(txID,"REDRIVE");
+                LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+            } catch (CommitUnsuccessfulException cue) {
+                LOG.error("CommitUnsuccessfulException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , cue);
+                TmDDL tmDDL = hbtx.getTmDDL();
+                tmDDL.setState(txID,"REDRIVE");
+                LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+            }
+        }
+      }//recoverTransactions()
+          
+   } //class RecoveryThread
+
 
+     
      //================================================================================
      // DTMCI Calls
      //================================================================================


[3/3] incubator-trafodion git commit: Merge [TRAFODION-2468] PR 993 DDL HA changes, part 2

Posted by db...@apache.org.
Merge [TRAFODION-2468] PR 993 DDL HA changes, part 2


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/f7e27c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/f7e27c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/f7e27c5d

Branch: refs/heads/master
Commit: f7e27c5d1b4eda4fcb428d76fb21a6959e5349ae
Parents: 3c1c4ad 8f3b836
Author: Dave Birdsall <db...@apache.org>
Authored: Wed Mar 22 01:53:30 2017 +0000
Committer: Dave Birdsall <db...@apache.org>
Committed: Wed Mar 22 01:53:30 2017 +0000

----------------------------------------------------------------------
 .../hbase/client/transactional/TmDDL.java       |  33 +-
 .../java/org/trafodion/dtm/HBaseTxClient.java   | 487 +++++++++++++------
 2 files changed, 367 insertions(+), 153 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-trafodion git commit: fix based on review comments.

Posted by db...@apache.org.
fix based on review comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/8f3b8367
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/8f3b8367
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/8f3b8367

Branch: refs/heads/master
Commit: 8f3b8367400790a1f0677dc41a0cfd2339079594
Parents: d133d62
Author: Prashant Vasudev <pr...@esgyn.com>
Authored: Tue Mar 21 20:23:29 2017 +0000
Committer: Prashant Vasudev <pr...@esgyn.com>
Committed: Tue Mar 21 20:23:29 2017 +0000

----------------------------------------------------------------------
 .../java/org/trafodion/dtm/HBaseTxClient.java   | 22 +++++++++++---------
 1 file changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8f3b8367/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index cef53b1..a967dac 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -1114,10 +1114,10 @@ public class HBaseTxClient {
                         boolean loopBack = false;
                         if(ddlOnlyRecoveryCheck)
                         {
+                          ddlOnlyRecoveryCheck = false;
                           transactionStates = getTransactionsFromTmDDL();
                           if(transactionStates != null)
                             recoverTransactions(transactionStates);
-                          ddlOnlyRecoveryCheck = false;
                         }
                         
                         do {
@@ -1396,17 +1396,19 @@ public class HBaseTxClient {
             } catch (UnsuccessfulDDLException ddle) {
                 LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , ddle);
   
-                //Note that there may not be anymore redrive triggers from region server point of view for DDL operation.
-                //Register this DDL transaction for subsequent redrive from Audit Control Event.
-                //TODO: Launch a new Redrive Thread out of auditControlPoint().
-                TmDDL tmDDL = hbtx.getTmDDL();
-                tmDDL.setState(txID,"REDRIVE");
-                LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+                //Do not change the state of txId in tmDDL. Let the recovery thread
+                //detect this txID again and redrive. Reset flag to loop back and
+                //check for tmDDL again.
+                ddlOnlyRecoveryCheck = true;
+                
             } catch (CommitUnsuccessfulException cue) {
                 LOG.error("CommitUnsuccessfulException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , cue);
-                TmDDL tmDDL = hbtx.getTmDDL();
-                tmDDL.setState(txID,"REDRIVE");
-                LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+                
+                //Do not change the state of txId in tmDDL. Let the recovery thread
+                //detect this txID again and redrive. Reset flag to loop back and
+                //check for tmDDL again.
+                ddlOnlyRecoveryCheck = true;
+
             }
         }
       }//recoverTransactions()