You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by sa...@apache.org on 2017/05/12 20:25:38 UTC
[03/22] incubator-trafodion git commit: This is a large contribution
of changes from Esgyn TransactionManager and libraries that are collectively
much better tested and hardened than Trafodion,
but are too numerous and complex to cherry pick individually
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index d7be3ea..21ca319 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@ -94,6 +94,7 @@ public class HBaseTxClient {
private int stallWhere;
private IdTm idServer;
private static final int ID_TM_SERVER_TIMEOUT = 1000;
+ private static int myClusterId = 0;
public enum AlgorithmType{
MVCC, SSCC
@@ -180,7 +181,7 @@ public class HBaseTxClient {
try {
trxManager = TransactionManager.getInstance(config, connection);
} catch (IOException e ){
- LOG.error("Unable to create TransactionManager, throwing exception", e);
+ LOG.error("trxManager Initialization failure throwing exception", e);
throw e;
}
@@ -300,7 +301,9 @@ public class HBaseTxClient {
this,
useForgotten,
forceForgotten,
- useTlog);
+ useTlog,
+ false,
+ false);
recovThread.start();
}
if (LOG.isTraceEnabled()) LOG.trace("Exit init()");
@@ -328,7 +331,9 @@ public class HBaseTxClient {
this,
useForgotten,
forceForgotten,
- useTlog);
+ useTlog,
+ false,
+ true);
newRecovThread.start();
mapRecoveryThreads.put(nodeID, recovThread);
if(LOG.isTraceEnabled()) LOG.trace("nodeDown -- mapRecoveryThreads size: " + mapRecoveryThreads.size());
@@ -366,6 +371,10 @@ public class HBaseTxClient {
return TransReturnCode.RET_OK.getShort();
}
+ public static Map<Long, TransactionState> getMap() {
+ return mapTransactionStates;
+ }
+
public long beginTransaction(final long transactionId) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransaction, txid: " + transactionId);
@@ -383,11 +392,11 @@ public class HBaseTxClient {
if (tx2 != null) {
// Some other thread added the transaction while we were creating one. It's already in the
// map, so we can use the existing one.
- if (LOG.isDebugEnabled()) LOG.debug("HBaseTxClient:beginTransaction, found TransactionState object while creating a new one " + tx2);
+ if (LOG.isDebugEnabled()) LOG.debug("beginTransaction, found TransactionState object while creating a new one " + tx2);
tx = tx2;
}
else {
- if (LOG.isDebugEnabled()) LOG.debug("HBaseTxClient:beginTransaction, adding new TransactionState to map " + tx);
+ if (LOG.isDebugEnabled()) LOG.debug("beginTransaction, adding new TransactionState to map " + tx);
mapTransactionStates.put(transactionId, tx);
}
}
@@ -397,7 +406,7 @@ public class HBaseTxClient {
}
public short abortTransaction(final long transactionID) throws IOException {
- if (LOG.isDebugEnabled()) LOG.debug("Enter abortTransaction, txid: " + transactionID);
+ if (LOG.isDebugEnabled()) LOG.debug("Enter abortTransaction, txId: " + transactionID);
TransactionState ts = mapTransactionStates.get(transactionID);
if(ts == null) {
@@ -408,10 +417,11 @@ public class HBaseTxClient {
try {
ts.setStatus(TransState.STATE_ABORTED);
if (useTlog) {
- tLog.putSingleRecord(transactionID, -1, "ABORTED", ts.getParticipatingRegions(), false);
+ tLog.putSingleRecord(transactionID, ts.getStartId(), -1, TransState.STATE_ABORTED.toString(), ts.getParticipatingRegions(), ts.hasPlaceHolder(), false); //force flush
}
- } catch(IOException e) {
- LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION", e);
+ }
+ catch(IOException e) {
+ LOG.error("Returning from HBaseTxClient:abortTransaction, txid: " + transactionID + " tLog.putRecord: EXCEPTION ", e);
return TransReturnCode.RET_EXCEPTION.getShort();
}
@@ -465,12 +475,7 @@ public class HBaseTxClient {
}
if (useTlog && useForgotten) {
- if (forceForgotten) {
- tLog.putSingleRecord(transactionID, -1, "FORGOTTEN", null, true);
- }
- else {
- tLog.putSingleRecord(transactionID, -1, "FORGOTTEN", null, false);
- }
+ tLog.putSingleRecord(transactionID, ts.getStartId(), -1, TransState.STATE_FORGOTTEN_ABORT.toString(), ts.getParticipatingRegions(), ts.hasPlaceHolder(), forceForgotten); // forced flush?
}
// mapTransactionStates.remove(transactionID);
@@ -481,11 +486,17 @@ public class HBaseTxClient {
public short prepareCommit(long transactionId) throws
TransactionManagerException,
IOException{
- if (LOG.isDebugEnabled()) LOG.debug("Enter prepareCommit, txid: " + transactionId);
- if (LOG.isTraceEnabled()) LOG.trace("mapTransactionStates " + mapTransactionStates + " entries " + mapTransactionStates.size());
- TransactionState ts = mapTransactionStates.get(transactionId);
- if(ts == null) {
- LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOTX.toString());
+ if (LOG.isDebugEnabled()) LOG.debug("Enter prepareCommit"
+ + ", txId: " + transactionId
+ + ", #txstate entries " + mapTransactionStates.size()
+ );
+ TransactionState ts = mapTransactionStates.get(transactionId);
+
+ if (ts == null) {
+ LOG.error("Returning from prepareCommit"
+ + ", txId: " + transactionId
+ + ", retval: " + TransReturnCode.RET_NOTX.toString()
+ );
return TransReturnCode.RET_NOTX.getShort();
}
@@ -494,13 +505,13 @@ public class HBaseTxClient {
if (LOG.isDebugEnabled()) LOG.debug("prepareCommit, [ " + ts + " ], result " + result + ((result == TransactionalReturn.COMMIT_OK_READ_ONLY)?", Read-Only":""));
switch (result) {
case TransactionalReturn.COMMIT_OK:
- if (LOG.isTraceEnabled()) LOG.trace("Exit OK prepareCommit, txid: " + transactionId);
+ if (LOG.isTraceEnabled()) LOG.trace("Exit OK prepareCommit, txId: " + transactionId);
return TransReturnCode.RET_OK.getShort();
case TransactionalReturn.COMMIT_OK_READ_ONLY:
synchronized(mapLock) {
mapTransactionStates.remove(transactionId);
}
- if (LOG.isTraceEnabled()) LOG.trace("Exit OK_READ_ONLY prepareCommit, txid: " + transactionId);
+ if (LOG.isTraceEnabled()) LOG.trace("Exit OK_READ_ONLY prepareCommit, txId: " + transactionId);
return TransReturnCode.RET_READONLY.getShort();
case TransactionalReturn.COMMIT_UNSUCCESSFUL:
if(!ts.getRecordedException().isEmpty())
@@ -530,28 +541,32 @@ public class HBaseTxClient {
throw t;
}
catch (CommitUnsuccessfulException e) {
- LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException", e);
+ LOG.error("Returning from prepareCommit, txId: " + transactionId + " retval: " + TransReturnCode.RET_NOCOMMITEX.toString() + " CommitUnsuccessfulException");
throw new TransactionManagerException(e,
TransReturnCode.RET_NOCOMMITEX.getShort());
}
catch (IOException e) {
- LOG.error("Returning from HBaseTxClient:prepareCommit, txid: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException", e);
+ LOG.error("Returning from prepareCommit, txId: " + transactionId + " retval: " + TransReturnCode.RET_IOEXCEPTION.toString() + " IOException");
throw new TransactionManagerException(e,
TransReturnCode.RET_IOEXCEPTION.getShort());
}
}
- public short doCommit(long transactionId) throws IOException, CommitUnsuccessfulException {
- if (LOG.isDebugEnabled()) LOG.debug("Enter doCommit, txid: " + transactionId);
- TransactionState ts = mapTransactionStates.get(transactionId);
+ public short doCommit(long transactionId) throws IOException {
+ if (LOG.isDebugEnabled()) LOG.debug("Enter doCommit, txId: " + transactionId);
+ TransactionState ts = mapTransactionStates.get(transactionId);
- if(ts == null) {
- LOG.error("Returning from HBaseTxClient:doCommit, (null tx) retval: " + TransReturnCode.RET_NOTX.toString() + " txid: " + transactionId);
+ if(ts == null) {
+ LOG.error("Returning from doCommit, (null tx) retval: "
+ + TransReturnCode.RET_NOTX.toString()
+ + ", txId: " + transactionId
+ );
return TransReturnCode.RET_NOTX.getShort();
}
// Set the commitId
IdTmId commitId = null;
+ long commitIdVal = -1;
if (TRANSACTION_ALGORITHM == AlgorithmType.SSCC) {
try {
commitId = new IdTmId();
@@ -562,24 +577,39 @@ public class HBaseTxClient {
LOG.error("doCommit: IdTm threw exception " , exc);
throw new CommitUnsuccessfulException("doCommit: IdTm threw exception " , exc);
}
+ commitIdVal = commitId.val;
}
-
- final long commitIdVal = (TRANSACTION_ALGORITHM == AlgorithmType.SSCC) ? commitId.val : -1;
if (LOG.isTraceEnabled()) LOG.trace("doCommit setting commitId (" + commitIdVal + ") for tx: " + ts.getTransactionId());
ts.setCommitId(commitIdVal);
- try {
- ts.setStatus(TransState.STATE_COMMITTED);
- if (useTlog) {
- tLog.putSingleRecord(transactionId, commitIdVal, "COMMITTED", ts.getParticipatingRegions(), true);
- }
- } catch(IOException e) {
- LOG.error("Returning from HBaseTxClient:doCommit, txid: " + transactionId + " tLog.putRecord: EXCEPTION ", e);
- return TransReturnCode.RET_EXCEPTION.getShort();
+ if (stallWhere == 4) {
+ if (LOG.isInfoEnabled())LOG.info("Stalling in phase 2a (before TLOG write) for doCommit for transaction: " + transactionId);
+ boolean loopBack = false;
+ do
+ {
+ try {
+ loopBack = false;
+ Thread.sleep(600000); // Initially set to run every 5 min
+ } catch (InterruptedException ie) {
+ loopBack = true;
+ }
+ } while (loopBack);
}
+ //try {
+ ts.setStatus(TransState.STATE_COMMITTING);
+ if (useTlog) {
+ try {
+ tLog.putSingleRecord(transactionId, ts.getStartId(), commitIdVal, TransState.STATE_COMMITTED.toString(), ts.getParticipatingRegions(), ts.hasPlaceHolder(), true);
+ ts.setStatus(TransState.STATE_COMMITTED);
+ }
+ catch (IOException e) {
+ LOG.error("doCommit: Local TLOG write threw exception during commit " , e);
+ throw new RuntimeException(e);
+ }
+ }
if ((stallWhere == 2) || (stallWhere == 3)) {
- LOG.info("Stalling in phase 2 for doCommit");
+ if (LOG.isInfoEnabled())LOG.info("Stalling in phase 2 for doCommit for transaction: " + transactionId);
boolean loopBack = false;
do {
try {
@@ -592,13 +622,15 @@ public class HBaseTxClient {
}
try {
+ if (LOG.isTraceEnabled()) LOG.trace("doCommit, calling trxManager.doCommit(" + ts.getTransactionId() + ")" );
trxManager.doCommit(ts);
} catch (CommitUnsuccessfulException e) {
- LOG.error("Returning from HBaseTxClient:doCommit, retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException" + " txid: " + transactionId, e);
+ LOG.error("Returning from doCommit, transaction: " + transactionId
+ + ", retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException");
return TransReturnCode.RET_EXCEPTION.getShort();
}
catch (UnsuccessfulDDLException ddle) {
- LOG.error("FATAL DDL Exception from HBaseTxClient:doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txid: " + transactionId, ddle);
+ LOG.error("FATAL DDL Exception from doCommit, WAITING INDEFINETLY !! retval: " + TransReturnCode.RET_EXCEPTION.toString() + " UnsuccessfulDDLException" + " txId: " + transactionId);
//Reaching here means several attempts to perform the DDL operation has failed in commit phase.
//Generally if only DML operation is involved, returning error causes TM to call completeRequest()
@@ -616,7 +648,9 @@ public class HBaseTxClient {
loopBack = false;
commitDDLLock.wait();
} catch(InterruptedException ie) {
- LOG.warn("Interrupting commitDDLLock.wait, but retrying ", ie);
+ LOG.warn("Interrupting commitDDLLock.wait"
+ + ", txId: " + transactionId
+ + ", retrying ", ie);
loopBack = true;
}
} while (loopBack);
@@ -624,38 +658,37 @@ public class HBaseTxClient {
return TransReturnCode.RET_EXCEPTION.getShort();
}
if (useTlog && useForgotten) {
- if (forceForgotten) {
- tLog.putSingleRecord(transactionId, commitIdVal, "FORGOTTEN", null, true);
- }
- else {
- tLog.putSingleRecord(transactionId, commitIdVal, "FORGOTTEN", null, false);
- }
+ tLog.putSingleRecord(transactionId, ts.getStartId(), commitIdVal, TransState.STATE_FORGOTTEN_COMMITTED.toString(), ts.getParticipatingRegions(), ts.hasPlaceHolder(), forceForgotten); // forced flush?
}
-// mapTransactionStates.remove(transactionId);
-
if (LOG.isTraceEnabled()) LOG.trace("Exit doCommit, retval(ok): " + TransReturnCode.RET_OK.toString() +
- " txid: " + transactionId + " mapsize: " + mapTransactionStates.size());
+ " txId: " + transactionId + " mapsize: " + mapTransactionStates.size());
return TransReturnCode.RET_OK.getShort();
}
- public short completeRequest(long transactionId) throws IOException, CommitUnsuccessfulException {
- if (LOG.isDebugEnabled()) LOG.debug("Enter completeRequest, txid: " + transactionId);
+ public short completeRequest(long transactionId)
+ throws IOException, CommitUnsuccessfulException
+ {
+ if (LOG.isDebugEnabled()) LOG.debug("Enter completeRequest"
+ + ", txId: " + transactionId
+ );
TransactionState ts = mapTransactionStates.get(transactionId);
- if(ts == null) {
- LOG.error("Returning from HBaseTxClient:completeRequest, (null tx) retval: " + TransReturnCode.RET_NOTX.toString() + " txid: " + transactionId);
- return TransReturnCode.RET_NOTX.getShort();
- }
-
+ if (ts == null) {
+ LOG.error("Returning from completeRequest, (null tx) retval: "
+ + TransReturnCode.RET_NOTX.toString()
+ + ", txId: " + transactionId
+ );
+ return TransReturnCode.RET_NOTX.getShort();
+ }
+
boolean loopBack = false;
do {
try {
- if (LOG.isTraceEnabled()) LOG.trace("TEMP completeRequest Calling CompleteRequest() Txid :" + transactionId);
loopBack = false;
ts.completeRequest();
} catch(InterruptedException ie) {
- LOG.warn("Interrupting HBaseTxClient:completeRequest but retrying, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: ", ie);
+ LOG.warn("Interrupting completeRequest but retrying, ts.completeRequest: txid: " + transactionId + ", EXCEPTION: ", ie);
loopBack = true;
}
} while (loopBack);
@@ -681,11 +714,8 @@ public class HBaseTxClient {
commitErr = doCommit(transactionId);
if (commitErr != TransReturnCode.RET_OK.getShort()) {
LOG.error("doCommit for committed transaction " + transactionId + " failed with error " + commitErr);
- // It is a violation of 2 PC protocol to try to abort the transaction after prepare
+ // It is a violation of 2 PC protocol to try to abort the transaction after commit write
return commitErr;
-// abortErr = abortTransaction(transactionId);
-// if (LOG.isDebugEnabled()) LOG.debug("tryCommit commit failed and was aborted. Commit error " +
-// commitErr + ", Abort error " + abortErr);
}
if (LOG.isTraceEnabled()) LOG.trace("TEMP tryCommit Calling CompleteRequest() Txid :" + transactionId);
@@ -704,7 +734,7 @@ public class HBaseTxClient {
mapTransactionStates.remove(transactionId);
}
- if (LOG.isDebugEnabled()) LOG.debug("Exit completeRequest txid: " + transactionId + " mapsize: " + mapTransactionStates.size());
+ if (LOG.isDebugEnabled()) LOG.debug("Exit completeRequest txId: " + transactionId + " mapsize: " + mapTransactionStates.size());
return TransReturnCode.RET_OK.getShort();
}
@@ -713,11 +743,11 @@ public class HBaseTxClient {
TransactionState ts;
HTableDescriptor htdesc = null;
- if (LOG.isTraceEnabled()) LOG.trace("Enter callCreateTable, txid: [" + transactionId + "], htbldesc bytearray: " + pv_htbldesc + "desc in hex: " + Hex.encodeHexString(pv_htbldesc));
+ if (LOG.isTraceEnabled()) LOG.trace("Enter callCreateTable, txId: [" + transactionId + "], htbldesc bytearray: " + pv_htbldesc + "desc in hex: " + Hex.encodeHexString(pv_htbldesc));
ts = mapTransactionStates.get(transactionId);
if(ts == null) {
- LOG.error("Returning from HBaseTxClient:callCreateTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txid: " + transactionId);
+ LOG.error("Returning from callCreateTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txId: " + transactionId);
return TransReturnCode.RET_NOTX.getShort();
}
try {
@@ -745,11 +775,11 @@ public class HBaseTxClient {
TransactionState ts;
String strTblName = new String(pv_tblname, "UTF-8");
- if (LOG.isTraceEnabled()) LOG.trace("Enter callAlterTable, txid: [" + transactionId + "], tableName: " + strTblName);
+ if (LOG.isTraceEnabled()) LOG.trace("Enter callAlterTable, txId: [" + transactionId + "], tableName: " + strTblName);
ts = mapTransactionStates.get(transactionId);
if(ts == null) {
- LOG.error("Returning from HBaseTxClient:callAlterTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txid: " + transactionId);
+ LOG.error("Returning from callAlterTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txId: " + transactionId);
return TransReturnCode.RET_NOTX.getShort();
}
@@ -762,11 +792,11 @@ public class HBaseTxClient {
TransactionState ts;
String strTblName = new String(pv_tblname, "UTF-8");
- if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterTruncateOnAbort, txid: [" + transactionId + "], tablename: " + strTblName);
+ if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterTruncateOnAbort, txId: [" + transactionId + "], tablename: " + strTblName);
ts = mapTransactionStates.get(transactionId);
if(ts == null) {
- LOG.error("Returning from HBaseTxClient:callRegisterTruncateOnAbort, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txid: " + transactionId);
+ LOG.error("Returning from callRegisterTruncateOnAbort, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txId: " + transactionId);
return TransReturnCode.RET_NOTX.getShort();
}
@@ -779,11 +809,11 @@ public class HBaseTxClient {
TransactionState ts;
String strTblName = new String(pv_tblname, "UTF-8");
- if (LOG.isTraceEnabled()) LOG.trace("Enter callDropTable, txid: [" + transactionId + "], tablename: " + strTblName);
+ if (LOG.isTraceEnabled()) LOG.trace("Enter callDropTable, txId: [" + transactionId + "], tablename: " + strTblName);
ts = mapTransactionStates.get(transactionId);
if(ts == null) {
- LOG.error("Returning from HBaseTxClient:callDropTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txid: " + transactionId);
+ LOG.error("Returning from callDropTable, (null tx) retval: " + TransReturnCode.RET_NOTX.getShort() + " txId: " + transactionId);
return TransReturnCode.RET_NOTX.getShort();
}
@@ -798,8 +828,14 @@ public class HBaseTxClient {
long pv_startcode,
byte[] pv_regionInfo) throws IOException {
String hostname = new String(pv_hostname);
- if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterRegion, txid: [" + transactionId + "], startId: " + startId + ", port: "
- + pv_port + ", hostname: " + hostname + ", reg info len: " + pv_regionInfo.length + " " + new String(pv_regionInfo, "UTF-8"));
+ if (LOG.isTraceEnabled()) LOG.trace("Enter callRegisterRegion, "
+ + "txId: [" + transactionId + "]"
+ + ", startId: " + startId
+ + ", port: " + pv_port
+ + ", hostname: " + hostname
+ + ", startcode: " + pv_startcode
+ + ", reg info len: " + pv_regionInfo.length
+ + " " + new String(pv_regionInfo, "UTF-8"));
HRegionInfo lv_regionInfo;
try {
@@ -818,7 +854,7 @@ public class HBaseTxClient {
TransactionState ts = mapTransactionStates.get(transactionId);
if(ts == null) {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion transactionId (" + transactionId +
+ if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion transactionId (" + transactionId +
") not found in mapTransactionStates of size: " + mapTransactionStates.size());
try {
ts = trxManager.beginTransaction(transactionId);
@@ -831,38 +867,35 @@ public class HBaseTxClient {
if (ts2 != null) {
// Some other thread added the transaction while we were creating one. It's already in the
// map, so we can use the existing one.
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion, found TransactionState object while creating a new one " + ts2);
+ if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion, found TransactionState object while creating a new one " + ts2);
ts = ts2;
}
else {
ts.setStartId(startId);
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion new transactionState created: " + ts );
+ if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion new transactionState created: " + ts );
}
}// end synchronized
}
else {
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion existing transactionState found: " + ts );
+ if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion existing transactionState found: " + ts );
if (ts.getStartId() == -1) {
ts.setStartId(startId);
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient:callRegisterRegion reset startId for transactionState: " + ts );
+ if (LOG.isTraceEnabled()) LOG.trace("callRegisterRegion reset startId for transactionState: " + ts );
}
}
try {
trxManager.registerRegion(ts, regionLocation);
} catch (IOException e) {
- LOG.error("HBaseTxClient:callRegisterRegion exception in registerRegion call, txid: " + transactionId +
- " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " , e);
+ LOG.error("callRegisterRegion exception in registerRegion call, txId: " + transactionId +
+ " retval: " + TransReturnCode.RET_EXCEPTION.toString() + " IOException " + e);
return TransReturnCode.RET_EXCEPTION.getShort();
}
if (LOG.isDebugEnabled()) LOG.debug("RegisterRegion adding table name " + regionTableName);
ts.addTableName(regionTableName);
- // Removing unnecessary put back into the map
- // mapTransactionStates.put(ts.getTransactionId(), ts);
-
- if (LOG.isTraceEnabled()) LOG.trace("Exit callRegisterRegion, txid: [" + transactionId + "] with mapsize: "
+ if (LOG.isTraceEnabled()) LOG.trace("Exit callRegisterRegion, txId: [" + transactionId + "] with mapsize: "
+ mapTransactionStates.size());
return TransReturnCode.RET_OK.getShort();
}
@@ -893,7 +926,7 @@ public class HBaseTxClient {
if (LOG.isTraceEnabled()) LOG.trace("Enter addControlPoint");
long result = 0L;
if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient calling tLog.addControlPoint with mapsize " + mapTransactionStates.size());
- result = tLog.addControlPoint(mapTransactionStates);
+ result = tLog.addControlPoint(myClusterId, mapTransactionStates, true);
Long lowestStartId = Long.MAX_VALUE;
for(ConcurrentHashMap.Entry<Long, TransactionState> entry : mapTransactionStates.entrySet()){
TransactionState value;
@@ -1019,6 +1052,8 @@ public class HBaseTxClient {
private boolean useForgotten;
private boolean forceForgotten;
private boolean useTlog;
+ private boolean leadtm;
+ private boolean takeover;
HBaseTxClient hbtx;
private static int envSleepTimeInt;
private boolean ddlOnlyRecoveryCheck = true;
@@ -1038,36 +1073,43 @@ public class HBaseTxClient {
HBaseTxClient hbtx,
boolean useForgotten,
boolean forceForgotten,
- boolean useTlog) {
+ boolean useTlog,
+ boolean leadtm,
+ boolean takeover) {
this(audit, zookeeper, txnManager);
this.hbtx = hbtx;
this.useForgotten = useForgotten;
this.forceForgotten = forceForgotten;
this.useTlog= useTlog;
- }
+ this.leadtm = leadtm;
+ this.takeover = takeover;
+ if(LOG.isDebugEnabled()) LOG.debug("Traf Recovery Thread starts for DTM " + tmID +
+ " LDTM " + leadtm + " Takeover " + takeover);
+
+ }
/**
*
* @param audit
* @param zookeeper
* @param txnManager
*/
- public RecoveryThread(TmAuditTlog audit,
+ public RecoveryThread(TmAuditTlog audit,
HBaseTmZK zookeeper,
TransactionManager txnManager)
- {
+ {
this.audit = audit;
this.zookeeper = zookeeper;
this.txnManager = txnManager;
this.inDoubtList = new HashSet<Long> ();
this.tmID = zookeeper.getTMID();
this.sleepTimeInt = envSleepTimeInt;
- }
+ }
- public void stopThread() {
+ public void stopThread() {
this.continueThread = false;
- }
+ }
- private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws IOException{
+ private void addRegionToTS(String hostnamePort, byte[] regionInfo, TransactionState ts) throws IOException{
HRegionInfo regionInfoLoc; // = new HRegionInfo();
final byte [] delimiter = ",".getBytes();
String[] result = hostnamePort.split(new String(delimiter), 3);
@@ -1141,10 +1183,10 @@ public class HBaseTxClient {
if(transactionStates != null)
recoverTransactions(transactionStates);
- }
+ } // region not null
else {
if (recoveryIterations > 0) {
- if(LOG.isInfoEnabled()) LOG.info("Recovery completed for TM" + tmID);
+ if(LOG.isDebugEnabled()) LOG.debug("Recovery completed for TM" + tmID);
}
recoveryIterations = -1;
}
@@ -1156,7 +1198,7 @@ public class HBaseTxClient {
}
retryCount = 0;
} catch (InterruptedException e) {
- LOG.error("Error in recoveryThread: " + e);
+ LOG.error("Error in recoveryThread: ", e);
}
} catch (IOException e) {
int possibleRetries = 4;
@@ -1351,6 +1393,7 @@ public class HBaseTxClient {
for (Map.Entry<Long, TransactionState> tsEntry : transactionStates.entrySet()) {
int isTransactionStillAlive = 0;
+ TransactionState ts1 = null;
TransactionState ts = tsEntry.getValue();
Long txID = ts.getTransactionId();
// TransactionState ts = new TransactionState(txID);
@@ -1379,9 +1422,16 @@ public class HBaseTxClient {
LOG.debug("TRAF RCOV THREAD:Redriving commit for " + txID + " number of regions " + ts.getParticipatingRegions().size() +
" and tolerating UnknownTransactionExceptions");
txnManager.doCommit(ts, true /*ignore UnknownTransactionException*/);
- if(useTlog && useForgotten) {
- long nextAsn = tLog.getNextAuditSeqNum((int)TransactionState.getNodeId(txID));
- tLog.putSingleRecord(txID, ts.getCommitId(), "FORGOTTEN", null, forceForgotten, nextAsn);
+ if(useTlog) {
+ long nextAsn = tLog.getNextAuditSeqNum(myClusterId, (int)TransactionState.getNodeId(txID));
+ tLog.putSingleRecord(txID,
+ ts.getStartId(),
+ ts.getCommitId(),
+ TransState.STATE_RECOVERY_COMMITTED.toString(),
+ ts.getParticipatingRegions(),
+ false,
+ forceForgotten,
+ nextAsn);
}
} else if (ts.getStatus().equals(TransState.STATE_ABORTED.toString())) {
if (LOG.isDebugEnabled())
@@ -1436,7 +1486,7 @@ public class HBaseTxClient {
TransactionState value;
int tnum = 0; // Transaction number
- if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient::callRequestRegionInfo:: start\n");
+ if (LOG.isTraceEnabled()) LOG.trace(":callRequestRegionInfo:: start\n");
HashMapArray hm = new HashMapArray();
@@ -1448,40 +1498,64 @@ public class HBaseTxClient {
TransactionState ts = mapTransactionStates.get(id);
final Set<TransactionRegionLocation> regions = ts.getParticipatingRegions();
+ // TableName
Iterator<TransactionRegionLocation> it = regions.iterator();
-
- while(it.hasNext()) {
- TransactionRegionLocation trl = it.next();
- tablename = trl.getRegionInfo().getTable().getNameAsString();
- if(tablename.contains("TRAFODION._MD_."))
- continue;
- encoded_region_name = trl.getRegionInfo().getEncodedName();
- region_name = trl.getRegionInfo().getRegionNameAsString();
- boolean is_offline_bool = trl.getRegionInfo().isOffline();
- is_offline = String.valueOf(is_offline_bool);
- region_id = String.valueOf(trl.getRegionInfo().getRegionId());
- thn = String.valueOf(trl.getHostname());
- hostname = thn.substring(0, thn.length()-1);
- port = String.valueOf(trl.getPort());
- startkey = Bytes.equals(trl.getRegionInfo().getStartKey(), HConstants.EMPTY_START_ROW) ?
- "INFINITE" : Hex.encodeHexString(trl.getRegionInfo().getStartKey());
- endkey = Bytes.equals(trl.getRegionInfo().getEndKey(), HConstants.EMPTY_END_ROW) ?
- "INFINITE" : Hex.encodeHexString(trl.getRegionInfo().getEndKey());
-
- StringBuilder inputStr = new StringBuilder();
- inputStr.append(tablename).append(";");
- inputStr.append(encoded_region_name).append(";");
- inputStr.append(region_name).append(";");
- inputStr.append(region_id).append(";");
- inputStr.append(hostname).append(";");
- inputStr.append(port).append(";");
- inputStr.append(startkey).append(";");
- inputStr.append(endkey);
- hm.appendRegionInfo(id, inputStr.toString());
+ tablename = it.next().getRegionInfo().getTable().getNameAsString();
+ while(it.hasNext()){
+ tablename = tablename + ";" + it.next().getRegionInfo().getTable().getNameAsString();
+ }
+ hm.addElement(tnum, "TableName", tablename);
- }
- tnum = tnum + 1;
- }
+ // Encoded Region Name
+ Iterator<TransactionRegionLocation> it2 = regions.iterator();
+ encoded_region_name = it2.next().getRegionInfo().getEncodedName();
+ while(it2.hasNext()){
+ encoded_region_name = encoded_region_name + ";" + it2.next().getRegionInfo().getTable().getNameAsString();
+ }
+ hm.addElement(tnum, "EncodedRegionName", encoded_region_name);
+
+ // Region Name
+ Iterator<TransactionRegionLocation> it3 = regions.iterator();
+ region_name = it3.next().getRegionInfo().getRegionNameAsString();
+ while(it3.hasNext()){
+ region_name = region_name + ";" + it3.next().getRegionInfo().getTable().getNameAsString();
+ }
+ hm.addElement(tnum, "RegionName", region_name);
+
+ // Region Offline
+ Iterator<TransactionRegionLocation> it4 = regions.iterator();
+ boolean is_offline_bool = it4.next().getRegionInfo().isOffline();
+ is_offline = String.valueOf(is_offline_bool);
+ hm.addElement(tnum, "RegionOffline", is_offline);
+
+ // Region ID
+ Iterator<TransactionRegionLocation> it5 = regions.iterator();
+ region_id = String.valueOf(it5.next().getRegionInfo().getRegionId());
+ while(it5.hasNext()){
+ region_id = region_id + ";" + it5.next().getRegionInfo().getRegionId();
+ }
+ hm.addElement(tnum, "RegionID", region_id);
+
+ // Hostname
+ Iterator<TransactionRegionLocation> it6 = regions.iterator();
+ thn = String.valueOf(it6.next().getHostname());
+ hostname = thn.substring(0, thn.length()-1);
+ while(it6.hasNext()){
+ thn = String.valueOf(it6.next().getHostname());
+ hostname = hostname + ";" + thn.substring(0, thn.length()-1);
+ }
+ hm.addElement(tnum, "Hostname", hostname);
+
+ // Port
+ Iterator<TransactionRegionLocation> it7 = regions.iterator();
+ port = String.valueOf(it7.next().getPort());
+ while(it7.hasNext()){
+ port = port + ";" + String.valueOf(it7.next().getPort());
+ }
+ hm.addElement(tnum, "Port", port);
+
+ tnum = tnum + 1;
+ }
if (LOG.isTraceEnabled()) LOG.trace("HBaseTxClient::callRequestRegionInfo:: end size: " + hm.getSize());
return hm;
}