You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2017/03/22 01:55:45 UTC
[1/3] incubator-trafodion git commit: [TRAFODION-2468] DDL HA ,
step 2 changes(please refer jira).
Repository: incubator-trafodion
Updated Branches:
refs/heads/master 3c1c4ad4c -> f7e27c5d1
[TRAFODION-2468] DDL HA , step 2 changes(please refer jira).
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/d133d621
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/d133d621
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/d133d621
Branch: refs/heads/master
Commit: d133d6211b5c93bd3b79fee28db7d68b42df05ab
Parents: b5e7300
Author: Prashant Vasudev <pr...@esgyn.com>
Authored: Tue Mar 7 00:00:38 2017 +0000
Committer: Prashant Vasudev <pr...@esgyn.com>
Committed: Tue Mar 7 00:00:38 2017 +0000
----------------------------------------------------------------------
.../hbase/client/transactional/TmDDL.java | 33 +-
.../java/org/trafodion/dtm/HBaseTxClient.java | 485 +++++++++++++------
2 files changed, 365 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/d133d621/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
index 83eb945..af4c2e5 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
@@ -50,7 +50,8 @@ import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
public class TmDDL {
@@ -300,4 +301,34 @@ public class TmDDL {
Delete d = new Delete(Bytes.toBytes(lvTransid));
table.delete(d);
}
+
+ public List<Long> getTxIdList(short tmID) throws IOException {
+ byte [] value = null;
+ Scan s = new Scan();
+ s.setCaching(100);
+ s.setCacheBlocks(false);
+ ArrayList<Long> txIdList = new ArrayList<Long>();
+ ResultScanner ss = table.getScanner(s);
+ try{
+ for (Result r : ss) {
+ Long txid = Bytes.toLong(r.getRow());
+ if(TransactionState.getNodeId(txid) != tmID)
+ {
+ //not owned by this tmID
+ continue;
+ }
+ value = r.getValue(TDDL_FAMILY, TDDL_STATE);
+ if((value != null) && (value.length > 0) &&
+ (Bytes.toString(value).equals("VALID") ||
+ Bytes.toString(value).equals("REDRIVE")))
+ txIdList.add(txid);
+ }
+ }
+ finally {
+ ss.close();
+ }
+
+ return txIdList;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/d133d621/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index dd6bffe..cef53b1 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -911,6 +911,98 @@ public class HBaseTxClient {
/**
* Thread to gather recovery information for regions that need to be recovered
*/
+ /*
+ * DDL specific recovery operation use cases are as follows:
+ * In case of phase 1 prepare , phase 2 commit, DDL operations are
+ * completed once the region operations are complete. However incase of TM
+ * going down and restarting, some of the state information is lost.
+ * 1. If the DDL operation also involves DML operations, regions that have
+ * in doubt transaction request for help. Recovery thread here reconstructs
+ * the TS state and also DDL operation state from TMDDL and redrives the operation.
+ *
+ * 2. If all the regions have completed their operations and if only DDL operation
+ * is pending and TM goes down and restarted, there are no regions that
+ * would seek help to redrive the operations. if there is pending DDL operation
+ * it will be left starving as there are no triggers to redrive the operation.
+ * To handle this case, every time TM starts, as part of recovery thread start
+ * a general scan of TMDDL is made to check for owning transIDs and those that
+ * have active DDL is checked against state of transaction and appropriately redriven.
+ *
+ * 3. Failure of TM and restart of TM can happen at any state of DDL operation
+ * in progress and before that operation is recorded as complete. One way to
+ * accurately keep note of this operation in progress is to record the operation
+ * before and after the operation. For this, the table against which the operation
+ * is being performed would be the key in a new log, we choose another table
+ * called TmDDLObject table. This acts as a global semaphore for DDL table
+ * operation. Recovery Thread as part of its startup processing always checks
+ * against TmDDLObject table and if it owns the transaction, continues to
+ * recover the DDL operation.
+ *
+ */
+ /* DDL Use cases and detailed info :
+
+ Use case Tx result phase 1 phase 2
+ ---------- ---------- ------- ------------
+ Create Table commit no op no op
+ Create Table rollback no op disable ,delete
+
+ create table ,insert commit no op no op
+ create table, insert rollback no op disable, delete Notes:need flag for preclose, make this flag persistent in tmddl and region
+
+
+ Drop Table commit disable delete
+ Drop Table rollback no op no op
+
+ Insert, drop table commit disable delete Notes: flag for preclose, make this flag persistent in tmddl and region.
+ Insert, drop table rollback no op no op
+
+
+ 0. Commit thread and recovery thread do that same thing, either commit or abort.
+
+ 1. Commit thread is aware of other regions. Recovery thread is not aware of other regions.
+ Commit thread doing commit or abort will perform end point calls to all regions involved followed by DDL.
+ Recovery thread doing commit or abort will perform DDL operation separate and region operation separate.
+ Question is can DDL and DML commit processing be done in parallel or decoupled. Looks like its ok, MD dml
+ and actual DDL have same decision either to commit or abort.
+ One concern is visibility of the table based on corresponding MD changes being recovered( commit or abort)
+ at different times.
+
+ With create table scenario, unwinding DDL and MD at different times does not matter since
+ creation of new table from a new thread will get errors from Hmaster since table still exists in hmaster.
+ One caviot is TM thinking table is created, but before creating TM dies. When recovery thread attempts to recover abort,
+ it goes ahead and deletes the table( which some other thread already succeded in creating the table) which is very dangerous.
+ This scenario is handled as part of step 3 below.
+
+ With drop table scenario, unwinding drop DDL and MD at slightly different times is acceptable since:
+
+ In general, whether recovery thread or commit thread is driving the commit or abort, visibility of the table
+ is the same as originally intended.
+
+ Having DDL table and DML in the same table, having insertsDrop and createInserts flag persistent is needed for
+ redrive of commit or abort.
+
+
+ 2. Commit thread and recovery thread cannot overlap or do same execution in parallel.
+ Transaction status in Tlog must be the sole decision maker for recovery thread to redrive. If recovery thread
+ finds a active transaction state object in memory, it assumes commit thread is handling it.
+
+
+ 3. Failure of TM and restart of TM can happen at any state of DDL operation in progress and before that operation is recorded as complete.
+
+ In case of create table, TM dies before create table, tx is now aborted. Visibility of table is "Does not exist". However
+ the recovery thread should not attempt to delete the table if it really did not create it ( some other thread might have created it in parallel).
+ This is where we need a global semaphore on table name.
+
+ In case of create table, TM dies after create table, tx is now aborted. Visibility of table is "Does not exist". recovery thread
+ attempting to delete the table is ok here.
+
+ In case of drop table, the visibility of the table at Hmaster prevents other threads from creating a duplicate table. There is scenario of
+ accidently dropping a duplicate table.
+
+ In general, a global semaphore kind of method helps to reduce the complexity and recover from various stages of failures.
+
+ */
+
private static class RecoveryThread extends Thread{
static final int SLEEP_DELAY = 1000; // Initially set to run every 1sec
private int sleepTimeInt = 0;
@@ -928,6 +1020,7 @@ public class HBaseTxClient {
private boolean useTlog;
HBaseTxClient hbtx;
private static int envSleepTimeInt;
+ private boolean ddlOnlyRecoveryCheck = true;
static {
String sleepTime = System.getenv("TMRECOV_SLEEP");
@@ -997,16 +1090,36 @@ public class HBaseTxClient {
ts.addRegion(loc);
}
- @Override
- public void run() {
-
+ @Override
+ public void run() {
+
+ //Start of recovery thread could be due to only two scenarios.
+ //1. TM coming up as part of node up. In this case recovery thread
+ // assigned node id is the same as own node.
+ //2. As part of TM going down, LDTM starting a new recovery thread
+ // corresponding to the node id of TM that just went down. In this
+ // case, recovery thread node id is that of remote node that went down.
+ //
+ //In both scenarios, there may be DDL only recovery or DDL/DML region
+ //recovery. DDL/DML region recovery is driven from the affected regions.
+ //In the below loop, first DDL only recovery is checked first. This check
+ //is performed only once in the beginning. Rest of the recovery is
+ //is followed by DDL DML recovery check .
+
while (this.continueThread) {
try {
skipSleep = false;
Map<String, byte[]> regions = null;
- Map<Long, TransactionState> transactionStates =
- new HashMap<Long, TransactionState>();
+ Map<Long, TransactionState> transactionStates = null;
boolean loopBack = false;
+ if(ddlOnlyRecoveryCheck)
+ {
+ transactionStates = getTransactionsFromTmDDL();
+ if(transactionStates != null)
+ recoverTransactions(transactionStates);
+ ddlOnlyRecoveryCheck = false;
+ }
+
do {
try {
loopBack = false;
@@ -1022,152 +1135,10 @@ public class HBaseTxClient {
skipSleep = true;
recoveryIterations++;
- if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt region size " + regions.size());
- for (Map.Entry<String, byte[]> regionEntry : regions.entrySet()) {
- List<Long> TxRecoverList = new ArrayList<Long>();
- String hostnamePort = regionEntry.getKey();
- byte[] regionBytes = regionEntry.getValue();
- if (LOG.isDebugEnabled())
- LOG.debug("TRAF RCOV THREAD:Recovery Thread Processing region: " + new String(regionBytes));
- if (recoveryIterations == 0) {
- if(LOG.isWarnEnabled()) {
- // Let's get the host name
- final byte [] delimiter = ",".getBytes();
- String[] hostname = hostnamePort.split(new String(delimiter), 3);
- if (hostname.length < 2) {
- throw new IllegalArgumentException("hostnamePort format is incorrect");
- }
-
- LOG.warn ("TRAF RCOV THREAD:Starting recovery with " + regions.size() +
- " regions to recover. First region hostname: " + hostnamePort +
- " Recovery iterations: " + recoveryIterations);
- }
- }
- else {
- if(recoveryIterations % 10 == 0) {
- if(LOG.isWarnEnabled()) {
- // Let's get the host name
- final byte [] delimiter = ",".getBytes();
- String[] hostname = hostnamePort.split(new String(delimiter), 3);
- if (hostname.length < 2) {
- throw new IllegalArgumentException("hostnamePort format is incorrect");
- }
- LOG.warn("TRAF RCOV THREAD:Recovery thread encountered " + regions.size() +
- " regions to recover. First region hostname: " + hostnamePort +
- " Recovery iterations: " + recoveryIterations);
- }
- }
- }
- try {
- TxRecoverList = txnManager.recoveryRequest(hostnamePort, regionBytes, tmID);
- }
- catch (IOException e) {
- // For all cases of Exception, we rely on the region to redrive the request.
- // Likely there is nothing to recover, due to a stale region entry, but it is always safe to redrive.
- // We log a warning event and delete the ZKNode entry.
- LOG.warn("TRAF RCOV THREAD:Exception calling txnManager.recoveryRequest. " + "TM: " +
- tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. \n exception: ", e);
- zookeeper.deleteRegionEntry(regionEntry);
-
- // In the case of NotServingRegionException we will repost the ZKNode after refreshing the table.
- if ((e instanceof NotServingRegionException) || (e.getCause() instanceof NotServingRegionException)){
- // Create a local HTable object using the regionInfo
- HTable table = new HTable(config, HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString());
- // Repost a zookeeper entry for all current regions in the table
- zookeeper.postAllRegionEntries(table);
- }
- } // IOException
-
- if (TxRecoverList != null) {
- if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size of TxRecoverList " + TxRecoverList.size());
- if (TxRecoverList.size() == 0) {
- // First delete the zookeeper entry
- LOG.warn("TRAF RCOV THREAD:Leftover Znode calling txnManager.recoveryRequest. " + "TM: " +
- tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. ");
- zookeeper.deleteRegionEntry(regionEntry);
- }
- for (Long txid : TxRecoverList) {
- TransactionState ts = transactionStates.get(txid);
- if (ts == null) {
- ts = new TransactionState(txid);
-
- //Identify if DDL is part of this transaction and valid
- if(hbtx.useDDLTrans){
- TmDDL tmDDL = hbtx.getTmDDL();
- StringBuilder state = new StringBuilder ();
- tmDDL.getState(txid,state);
- if(state.toString().equals("VALID"))
- ts.setDDLTx(true);
- }
- }
- this.addRegionToTS(hostnamePort, regionBytes, ts);
- transactionStates.put(txid, ts);
- }
- }
- else if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD:size od TxRecoverList is NULL ");
- }
- if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt transaction size " + transactionStates.size());
- for (Map.Entry<Long, TransactionState> tsEntry : transactionStates.entrySet()) {
- int isTransactionStillAlive = 0;
- TransactionState ts = tsEntry.getValue();
- Long txID = ts.getTransactionId();
- // TransactionState ts = new TransactionState(txID);
-
- //It is possible for long prepare situations that involve multiple DDL
- //operations, multiple prompts from RS is received. Hence check to see if there
- //is a TS object in main TS list and transaction is still active.
- //Note that tsEntry is local TS object.
- if (hbtx.mapTransactionStates.get(txID) != null) {
- if (hbtx.mapTransactionStates.get(txID).getStatus().toString().contains("ACTIVE")) {
- isTransactionStillAlive = 1;
- }
- if (LOG.isInfoEnabled())
- LOG.info("TRAF RCOV THREAD: TID " + txID
- + " still has TS object in TM memory. TS details: "
- + hbtx.mapTransactionStates.get(txID).toString()
- + " transactionAlive: " + isTransactionStillAlive);
- if(isTransactionStillAlive == 1)
- continue; //for loop
- }
-
- try {
- audit.getTransactionState(ts);
- if (ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {
- if (LOG.isDebugEnabled())
- LOG.debug("TRAF RCOV THREAD:Redriving commit for " + txID + " number of regions " + ts.getParticipatingRegions().size() +
- " and tolerating UnknownTransactionExceptions");
- txnManager.doCommit(ts, true /*ignore UnknownTransactionException*/);
- if(useTlog && useForgotten) {
- long nextAsn = tLog.getNextAuditSeqNum((int)TransactionState.getNodeId(txID));
- tLog.putSingleRecord(txID, ts.getCommitId(), "FORGOTTEN", null, forceForgotten, nextAsn);
- }
- } else if (ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
- if (LOG.isDebugEnabled())
- LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
- txnManager.abort(ts);
- } else {
- if (LOG.isDebugEnabled())
- LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
- LOG.warn("Recovering transaction " + txID + ", status is not set to COMMITTED or ABORTED. Aborting.");
- txnManager.abort(ts);
- }
-
- } catch (UnsuccessfulDDLException ddle) {
- LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , ddle);
-
- //Note that there may not be anymore redrive triggers from region server point of view for DDL operation.
- //Register this DDL transaction for subsequent redrive from Audit Control Event.
- //TODO: Launch a new Redrive Thread out of auditControlPoint().
- TmDDL tmDDL = hbtx.getTmDDL();
- tmDDL.setState(txID,"REDRIVE");
- LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
- } catch (CommitUnsuccessfulException cue) {
- LOG.error("CommitUnsuccessfulException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , cue);
- TmDDL tmDDL = hbtx.getTmDDL();
- tmDDL.setState(txID,"REDRIVE");
- LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
- }
- }
+ transactionStates = getTransactionsFromRegions(regions);
+
+ if(transactionStates != null)
+ recoverTransactions(transactionStates);
}
else {
@@ -1232,8 +1203,218 @@ public class HBaseTxClient {
}
if(LOG.isDebugEnabled()) LOG.debug("Exiting recovery thread for tm ID: " + tmID);
}
- }
+
+ private Map<Long, TransactionState> getTransactionsFromRegions(
+ Map<String, byte[]> regions)
+ throws IOException, KeeperException,
+ DeserializationException
+ {
+ if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt region size " + regions.size());
+ for (Map.Entry<String, byte[]> regionEntry : regions.entrySet()) {
+ Map<Long, TransactionState> transactionStates =
+ new HashMap<Long, TransactionState>();
+ List<Long> TxRecoverList = new ArrayList<Long>();
+ String hostnamePort = regionEntry.getKey();
+ byte[] regionBytes = regionEntry.getValue();
+ if (LOG.isDebugEnabled())
+ LOG.debug("TRAF RCOV THREAD:Recovery Thread Processing region: " + new String(regionBytes));
+ if (recoveryIterations == 0) {
+ if(LOG.isWarnEnabled()) {
+ // Let's get the host name
+ final byte [] delimiter = ",".getBytes();
+ String[] hostname = hostnamePort.split(new String(delimiter), 3);
+ if (hostname.length < 2) {
+ throw new IllegalArgumentException("hostnamePort format is incorrect");
+ }
+
+ LOG.warn ("TRAF RCOV THREAD:Starting recovery with " + regions.size() +
+ " regions to recover. First region hostname: " + hostnamePort +
+ " Recovery iterations: " + recoveryIterations);
+ }
+ }
+ else {
+ if(recoveryIterations % 10 == 0) {
+ if(LOG.isWarnEnabled()) {
+ // Let's get the host name
+ final byte [] delimiter = ",".getBytes();
+ String[] hostname = hostnamePort.split(new String(delimiter), 3);
+ if (hostname.length < 2) {
+ throw new IllegalArgumentException("hostnamePort format is incorrect");
+ }
+ LOG.warn("TRAF RCOV THREAD:Recovery thread encountered " + regions.size() +
+ " regions to recover. First region hostname: " + hostnamePort +
+ " Recovery iterations: " + recoveryIterations);
+ }
+ }
+ }
+ try {
+ TxRecoverList = txnManager.recoveryRequest(hostnamePort, regionBytes, tmID);
+ }
+ catch (IOException e) {
+ // For all cases of Exception, we rely on the region to redrive the request.
+ // Likely there is nothing to recover, due to a stale region entry, but it is always safe to redrive.
+ // We log a warning event and delete the ZKNode entry.
+ LOG.warn("TRAF RCOV THREAD:Exception calling txnManager.recoveryRequest. " + "TM: " +
+ tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. \n exception: ", e);
+ zookeeper.deleteRegionEntry(regionEntry);
+
+ // In the case of NotServingRegionException we will repost the ZKNode after refreshing the table.
+ if ((e instanceof NotServingRegionException) || (e.getCause() instanceof NotServingRegionException)){
+ // Create a local HTable object using the regionInfo
+ HTable table = new HTable(config, HRegionInfo.parseFrom(regionBytes).getTable().getNameAsString());
+ // Repost a zookeeper entry for all current regions in the table
+ zookeeper.postAllRegionEntries(table);
+ }
+ } // IOException
+
+ if (TxRecoverList != null) {
+ if (LOG.isDebugEnabled()) LOG.trace("TRAF RCOV THREAD:size of TxRecoverList " + TxRecoverList.size());
+ if (TxRecoverList.size() == 0) {
+ // First delete the zookeeper entry
+ LOG.warn("TRAF RCOV THREAD:Leftover Znode calling txnManager.recoveryRequest. " + "TM: " +
+ tmID + " regionBytes: [" + regionBytes + "]. Deleting zookeeper region entry. ");
+ zookeeper.deleteRegionEntry(regionEntry);
+ }
+ for (Long txid : TxRecoverList) {
+ TransactionState ts = transactionStates.get(txid);
+ if (ts == null) {
+ ts = new TransactionState(txid);
+
+ //Identify if DDL is part of this transaction and valid
+ if(hbtx.useDDLTrans){
+ TmDDL tmDDL = hbtx.getTmDDL();
+ StringBuilder state = new StringBuilder ();
+ tmDDL.getState(txid,state);
+ if(state.toString().equals("VALID"))
+ ts.setDDLTx(true);
+ }
+ }
+ this.addRegionToTS(hostnamePort, regionBytes, ts);
+ transactionStates.put(txid, ts);
+ }
+ }
+ else if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD:size od TxRecoverList is NULL ");
+
+ return transactionStates;
+ }
+ return null;
+ }
+
+ private Map<Long, TransactionState> getTransactionsFromTmDDL()
+ throws IOException
+ {
+ if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: Checking for DDL only recovery");
+
+ //Access TMDDL, return null if not enabled.
+ if(! hbtx.useDDLTrans)
+ return null;
+
+ Map<Long, TransactionState> transactionStates = null;
+ TmDDL tmDDL = hbtx.getTmDDL();
+ List<Long> txIdList = tmDDL.getTxIdList(tmID);
+
+ //This list of txID is specific to tmID owner.
+ //This list may include txId that are:
+ //1. currently in ACTIVE state. RecoverTransactions() call takes care of
+ //ignoring TxId which are currently actively in progress.
+ //2. Txids regions which have not yet requested for help(regions requesting help
+ //from zookeeper) , probably will, could be timing.
+ //3. Txids regions which have already requested for help.
+ //4. Txids whose regions have already serviced, but only require recovery
+ //from DDL perspective.
+ //For 2 and 3 use cases above, those regions will ultimately seek help if
+ //they need help. So no need to handle those regions here. We are only
+ //interested to handle use case 4. If usecase 4 also involves DML regions
+ //it is ok to recover the DDL only here and not dependent on DML regions.
+ //
+ //Note that recoverTransactions() attempts recovery, its a no-op if those
+ //txids are completed for some reason, some of the regions might have completed
+ //processing, ignoreUnknownTransactionException is enabled.
+ if(txIdList != null && txIdList.size() > 0)
+ {
+ transactionStates = new HashMap<Long, TransactionState>();
+ for (Long txid : txIdList)
+ {
+ //build ts object
+ TransactionState ts = new TransactionState(txid);
+ ts.setDDLTx(true);
+ transactionStates.put(txid, ts);
+ }
+ }
+ return transactionStates;
+ }
+
+ private void recoverTransactions(Map<Long, TransactionState> transactionStates) throws IOException
+ {
+ if (LOG.isDebugEnabled()) LOG.debug("TRAF RCOV THREAD: in-doubt transaction size " + transactionStates.size());
+
+ for (Map.Entry<Long, TransactionState> tsEntry : transactionStates.entrySet()) {
+ int isTransactionStillAlive = 0;
+ TransactionState ts = tsEntry.getValue();
+ Long txID = ts.getTransactionId();
+ // TransactionState ts = new TransactionState(txID);
+
+ //It is possible for long prepare situations that involve multiple DDL
+ //operations, multiple prompts from RS is received. Hence check to see if there
+ //is a TS object in main TS list and transaction is still active.
+ //Note that tsEntry is local TS object.
+ if (hbtx.mapTransactionStates.get(txID) != null) {
+ if (hbtx.mapTransactionStates.get(txID).getStatus().toString().contains("ACTIVE")) {
+ isTransactionStillAlive = 1;
+ }
+ if (LOG.isInfoEnabled())
+ LOG.info("TRAF RCOV THREAD: TID " + txID
+ + " still has TS object in TM memory. TS details: "
+ + hbtx.mapTransactionStates.get(txID).toString()
+ + " transactionAlive: " + isTransactionStillAlive);
+ if(isTransactionStillAlive == 1)
+ continue; //for loop
+ }
+
+ try {
+ audit.getTransactionState(ts);
+ if (ts.getStatus().equals(TransState.STATE_COMMITTED.toString())) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("TRAF RCOV THREAD:Redriving commit for " + txID + " number of regions " + ts.getParticipatingRegions().size() +
+ " and tolerating UnknownTransactionExceptions");
+ txnManager.doCommit(ts, true /*ignore UnknownTransactionException*/);
+ if(useTlog && useForgotten) {
+ long nextAsn = tLog.getNextAuditSeqNum((int)TransactionState.getNodeId(txID));
+ tLog.putSingleRecord(txID, ts.getCommitId(), "FORGOTTEN", null, forceForgotten, nextAsn);
+ }
+ } else if (ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
+ txnManager.abort(ts);
+ } else {
+ if (LOG.isDebugEnabled())
+ LOG.debug("TRAF RCOV THREAD:Redriving abort for " + txID);
+ LOG.warn("Recovering transaction " + txID + ", status is not set to COMMITTED or ABORTED. Aborting.");
+ txnManager.abort(ts);
+ }
+
+ } catch (UnsuccessfulDDLException ddle) {
+ LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , ddle);
+
+ //Note that there may not be anymore redrive triggers from region server point of view for DDL operation.
+ //Register this DDL transaction for subsequent redrive from Audit Control Event.
+ //TODO: Launch a new Redrive Thread out of auditControlPoint().
+ TmDDL tmDDL = hbtx.getTmDDL();
+ tmDDL.setState(txID,"REDRIVE");
+ LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+ } catch (CommitUnsuccessfulException cue) {
+ LOG.error("CommitUnsuccessfulException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , cue);
+ TmDDL tmDDL = hbtx.getTmDDL();
+ tmDDL.setState(txID,"REDRIVE");
+ LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+ }
+ }
+ }//recoverTransactions()
+
+ } //class RecoveryThread
+
+
//================================================================================
// DTMCI Calls
//================================================================================
[3/3] incubator-trafodion git commit: Merge [TRAFODION-2468] PR 993
DDL HA changes, part 2
Posted by db...@apache.org.
Merge [TRAFODION-2468] PR 993 DDL HA changes, part 2
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/f7e27c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/f7e27c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/f7e27c5d
Branch: refs/heads/master
Commit: f7e27c5d1b4eda4fcb428d76fb21a6959e5349ae
Parents: 3c1c4ad 8f3b836
Author: Dave Birdsall <db...@apache.org>
Authored: Wed Mar 22 01:53:30 2017 +0000
Committer: Dave Birdsall <db...@apache.org>
Committed: Wed Mar 22 01:53:30 2017 +0000
----------------------------------------------------------------------
.../hbase/client/transactional/TmDDL.java | 33 +-
.../java/org/trafodion/dtm/HBaseTxClient.java | 487 +++++++++++++------
2 files changed, 367 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-trafodion git commit: fix based on review comments.
Posted by db...@apache.org.
fix based on review comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/8f3b8367
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/8f3b8367
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/8f3b8367
Branch: refs/heads/master
Commit: 8f3b8367400790a1f0677dc41a0cfd2339079594
Parents: d133d62
Author: Prashant Vasudev <pr...@esgyn.com>
Authored: Tue Mar 21 20:23:29 2017 +0000
Committer: Prashant Vasudev <pr...@esgyn.com>
Committed: Tue Mar 21 20:23:29 2017 +0000
----------------------------------------------------------------------
.../java/org/trafodion/dtm/HBaseTxClient.java | 22 +++++++++++---------
1 file changed, 12 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/8f3b8367/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index cef53b1..a967dac 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -1114,10 +1114,10 @@ public class HBaseTxClient {
boolean loopBack = false;
if(ddlOnlyRecoveryCheck)
{
+ ddlOnlyRecoveryCheck = false;
transactionStates = getTransactionsFromTmDDL();
if(transactionStates != null)
recoverTransactions(transactionStates);
- ddlOnlyRecoveryCheck = false;
}
do {
@@ -1396,17 +1396,19 @@ public class HBaseTxClient {
} catch (UnsuccessfulDDLException ddle) {
LOG.error("UnsuccessfulDDLException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , ddle);
- //Note that there may not be anymore redrive triggers from region server point of view for DDL operation.
- //Register this DDL transaction for subsequent redrive from Audit Control Event.
- //TODO: Launch a new Redrive Thread out of auditControlPoint().
- TmDDL tmDDL = hbtx.getTmDDL();
- tmDDL.setState(txID,"REDRIVE");
- LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+ //Do not change the state of txId in tmDDL. Let the recovery thread
+ //detect this txID again and redrive. Reset flag to loop back and
+ //check for tmDDL again.
+ ddlOnlyRecoveryCheck = true;
+
} catch (CommitUnsuccessfulException cue) {
LOG.error("CommitUnsuccessfulException encountered by Recovery Thread. Registering for retry. txID: " + txID + "Exception " , cue);
- TmDDL tmDDL = hbtx.getTmDDL();
- tmDDL.setState(txID,"REDRIVE");
- LOG.error("TRAF RCOV THREAD:Error calling TmDDL putRow Redrive");
+
+ //Do not change the state of txId in tmDDL. Let the recovery thread
+ //detect this txID again and redrive. Reset flag to loop back and
+ //check for tmDDL again.
+ ddlOnlyRecoveryCheck = true;
+
}
}
}//recoverTransactions()