You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2015/12/08 17:01:55 UTC
[2/3] incubator-trafodion git commit: [TRAFODION-1663] Prepare, Commit,
and Abort requests from TransactionManager now carry a participant
number that helps track the specific requenst in the
RegionEndpointCoprocessor causing a problem when an UnknownTra
[TRAFODION-1663]
Prepare, Commit, and Abort requests from TransactionManager now carry a participant number that helps
track the specific requenst in the RegionEndpointCoprocessor causing a problem when an
UnknownTransactionException is thrown.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/84af9603
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/84af9603
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/84af9603
Branch: refs/heads/master
Commit: 84af9603c6ddb5f935ba8e1a535ba4dc28d1988b
Parents: fbfab1e
Author: Sean Broeder <sb...@edev03.esgyn.local>
Authored: Mon Dec 7 17:03:32 2015 +0000
Committer: Sean Broeder <sb...@edev03.esgyn.local>
Committed: Mon Dec 7 17:03:32 2015 +0000
----------------------------------------------------------------------
core/sqf/src/seatrans/hbase-trx/pom.xml.apache | 4 +
.../transactional/TransactionManager.java | 313 +++---
.../transactional/TrxRegionEndpoint.java | 70 +-
.../generated/SsccRegionProtos.java | 131 +--
.../generated/TrxRegionProtos.java | 1020 ++++++++++++++----
.../hbase-trx/src/main/protobuf/TrxRegion.proto | 11 +-
6 files changed, 1118 insertions(+), 431 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/84af9603/core/sqf/src/seatrans/hbase-trx/pom.xml.apache
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/pom.xml.apache b/core/sqf/src/seatrans/hbase-trx/pom.xml.apache
index c717479..d98e561 100755
--- a/core/sqf/src/seatrans/hbase-trx/pom.xml.apache
+++ b/core/sqf/src/seatrans/hbase-trx/pom.xml.apache
@@ -63,6 +63,8 @@
<!--<hbase.version>0.98.3-hadoop2</hbase.version>-->
<!--<hbase.version>0.98.3-hadoop1</hbase.version>-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <protobuf.version>2.5.0</protobuf.version>
+ <protocVersion>2.5.0</protocVersion>
<java.version>1.7</java.version>
<trx-suffix>apache</trx-suffix>
</properties>
@@ -338,6 +340,8 @@ if we can combine these profiles somehow -->
<goal>protoc</goal>
</goals>
<configuration>
+ <protobuf.version>2.5.0</protobuf.version>
+ <protocVersion>2.5.0</protocVersion>
<imports>
<param>${basedir}/src/main/protobuf</param>
<param>${basedir}/hbase-protocol/src/main/protobuf</param>
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/84af9603/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
index f8f5eaf..b9832a1 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@ -223,7 +223,6 @@ public class TransactionManager {
return g_TransactionManager;
}
-
/* increment/deincrement for positive value */
/* This method copied from o.a.h.h.utils.Bytes */
public static byte [] binaryIncrementPos(byte [] value, long amount) {
@@ -257,7 +256,7 @@ public class TransactionManager {
hbadmin = new HBaseAdmin(config);
}
catch(Exception e) {
- System.out.println("ERROR: Unable to obtain HBase accessors, Exiting");
+ System.out.println("ERROR: Unable to obtain HBase accessors, Exiting " + e);
e.printStackTrace();
System.exit(1);
}
@@ -281,7 +280,7 @@ public class TransactionManager {
table = new HTable(location.getRegionInfo().getTable(), connection, cp_tpe);
} catch(IOException e) {
e.printStackTrace();
- LOG.error("Error obtaining HTable instance");
+ LOG.error("Error obtaining HTable instance " + e);
table = null;
}
startKey = location.getRegionInfo().getStartKey();
@@ -299,7 +298,11 @@ public class TransactionManager {
* Return : Always 0, can ignore
* Purpose : Call commit for a given regionserver
*/
- public Integer doCommitX(final byte[] regionName, final long transactionId, final long commitId, final boolean ignoreUnknownTransactionException) throws CommitUnsuccessfulException, IOException {
+ public Integer doCommitX(final byte[] regionName,
+ final long transactionId,
+ final long commitId,
+ final int participantNum,
+ final boolean ignoreUnknownTransaction) throws CommitUnsuccessfulException, IOException {
boolean retry = false;
boolean refresh = false;
@@ -310,8 +313,9 @@ public class TransactionManager {
do {
try {
- if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- ENTRY txid: " + transactionId +
- " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
+ if (LOG.isDebugEnabled()) LOG.debug("doCommitX -- ENTRY txid: " + transactionId
+ + " participantNum " + participantNum
+ + " ignoreUnknownTransaction: " + ignoreUnknownTransaction);
Batch.Call<TrxRegionService, CommitResponse> callable =
new Batch.Call<TrxRegionService, CommitResponse>() {
ServerRpcController controller = new ServerRpcController();
@@ -322,8 +326,9 @@ public class TransactionManager {
public CommitResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequest.Builder builder = CommitRequest.newBuilder();
builder.setTransactionId(transactionId);
+ builder.setParticipantNum(participantNum);
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); //ByteString.copyFromUtf8(Bytes.toString(regionName)));
- builder.setIgnoreUnknownTransactionException(ignoreUnknownTransactionException);
+ builder.setIgnoreUnknownTransactionException(ignoreUnknownTransaction);
instance.commit(controller, builder.build(), rpcCallback);
return rpcCallback.get();
@@ -333,16 +338,18 @@ public class TransactionManager {
Map<byte[], CommitResponse> result = null;
try {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- before coprocessorService txid: " + transactionId +
- " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException + " table: " +
+ " ignoreUnknownTransaction: " + ignoreUnknownTransaction + " table: " +
table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
} catch (Throwable e) {
- String msg = "ERROR occurred while calling doCommitX coprocessor service in doCommitX";
- LOG.error(msg + ":" + e);
+ String msg = new String ("ERROR occurred while calling doCommitX coprocessor service in doCommitX for transaction: "
+ + transactionId + " participantNum " + participantNum + " Exception: " + e);
+ LOG.error(msg);
throw new Exception(msg);
}
if(result.size() == 0) {
- LOG.error("doCommitX, received incorrect result size: " + result.size() + " txid: " + transactionId);
+ LOG.error("doCommitX, received incorrect result size: " + result.size() + " txid: "
+ + transactionId + " location: " + location.getRegionInfo().getRegionNameAsString());
refresh = true;
retry = true;
}
@@ -351,8 +358,9 @@ public class TransactionManager {
for (CommitResponse cresponse : result.values()){
if(cresponse.getHasException()) {
String exceptionString = new String (cresponse.getException().toString());
+ LOG.error("doCommitX - exceptionString: " + exceptionString);
if (exceptionString.contains("UnknownTransactionException")) {
- if (ignoreUnknownTransactionException == true) {
+ if (ignoreUnknownTransaction == true) {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring UnknownTransactionException in cresponse");
}
else {
@@ -372,8 +380,9 @@ public class TransactionManager {
for (CommitResponse cresponse : result.values()){
if(cresponse.getHasException()) {
String exceptionString = new String (cresponse.getException().toString());
+ LOG.error("doCommitX - exceptionString: " + exceptionString);
if (exceptionString.contains("UnknownTransactionException")) {
- if (ignoreUnknownTransactionException == true) {
+ if (ignoreUnknownTransaction == true) {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring UnknownTransactionException in cresponse");
}
else {
@@ -395,18 +404,20 @@ public class TransactionManager {
}
catch (UnknownTransactionException ute) {
- LOG.error("Got unknown exception in doCommitX for transaction: " + transactionId + " " + ute);
+ LOG.error("Got unknown exception in doCommitX by participant " + participantNum
+ + " for transaction: " + transactionId + " " + ute);
transactionState.requestPendingCountDec(true);
throw new UnknownTransactionException();
}
catch (Exception e) {
if(e.toString().contains("Asked to commit a non-pending transaction")) {
- if (LOG.isDebugEnabled()) LOG.debug("doCommitX will not retry: " + e);
+ LOG.error("doCommitX transaction: "
+ + transactionId + " will not retry: " + e);
refresh = false;
retry = false;
}
else {
- LOG.error("doCommitX retrying due to Exception: " + e);
+ LOG.error("doCommitX retrying transaction: " + transactionId + " due to Exception: " + e);
refresh = true;
retry = true;
}
@@ -415,8 +426,6 @@ public class TransactionManager {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- //String lv_node = lv_hrl.getHostname();
- //int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + " endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -429,14 +438,11 @@ public class TransactionManager {
throw new CommitUnsuccessfulException("Exceeded retry attempts (" + retryCount + ") in doCommitX for transaction: " + transactionId);
}
-// if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) || // Encoded name is different
-// (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) { // Node is different
- if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- " + table.toString() + " location being refreshed");
- if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- lv_hri: " + lv_hri);
- if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- location.getRegionInfo(): " + location.getRegionInfo());
- table.getRegionLocation(startKey, true);
-// }
- if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- setting retry, count: " + retryCount);
+ if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- " + table.toString() + " location being refreshed");
+ if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- lv_hri: " + lv_hri);
+ if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- location.getRegionInfo(): " + location.getRegionInfo());
+ table.getRegionLocation(startKey, true);
+ if (LOG.isWarnEnabled()) LOG.warn("doCommitX -- setting retry, count: " + retryCount);
refresh = false;
}
@@ -460,7 +466,7 @@ public class TransactionManager {
try {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- ENTRY txid: " + transactionId +
- " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
+ " ignoreUnknownTransaction: " + ignoreUnknownTransaction);
Batch.Call<SsccRegionService, SsccCommitResponse> callable =
new Batch.Call<SsccRegionService, SsccCommitResponse>() {
ServerRpcController controller = new ServerRpcController();
@@ -473,7 +479,7 @@ public class TransactionManager {
builder.setTransactionId(transactionId);
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName))); //ByteString.copyFromUtf8(Bytes.toString(regionName)));
builder.setCommitId(commitId);
- builder.setIgnoreUnknownTransactionException(ignoreUnknownTransactionException);
+ builder.setIgnoreUnknownTransactionException(ignoreUnknownTransaction);
instance.commit(controller, builder.build(), rpcCallback);
return rpcCallback.get();
@@ -483,7 +489,7 @@ public class TransactionManager {
Map<byte[], SsccCommitResponse> result = null;
try {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- before coprocessorService txid: " + transactionId +
- " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException + " table: " +
+ " ignoreUnknownTransaction: " + ignoreUnknownTransaction + " table: " +
table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
result = table.coprocessorService(SsccRegionService.class, startKey, endKey, callable);
} catch (Throwable e) {
@@ -502,7 +508,7 @@ public class TransactionManager {
if(cresponse.getHasException()) {
String exceptionString = new String (cresponse.getException().toString());
if (exceptionString.contains("UnknownTransactionException")) {
- if (ignoreUnknownTransactionException == true) {
+ if (ignoreUnknownTransaction == true) {
if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring UnknownTransactionException in cresponse");
}
else {
@@ -520,12 +526,14 @@ public class TransactionManager {
}
}
catch (UnknownTransactionException ute) {
- LOG.error("Got unknown exception in doCommitX for transaction: " + transactionId + " " + ute);
+ LOG.error("Got unknown exception in doCommitX by participant " + participantNum
+ + " for transaction: " + transactionId + " " + ute);
transactionState.requestPendingCountDec(true);
throw new UnknownTransactionException();
}
catch (Exception e) {
- LOG.error("doCommitX retrying due to Exception: " + e);
+ LOG.error("doCommitX participant " + participantNum + " retrying transaction "
+ + transactionId + " due to Exception: " + e);
refresh = true;
retry = true;
}
@@ -590,7 +598,7 @@ public class TransactionManager {
* Return : Commit vote (yes, no, read only)
* Purpose : Call prepare for a given regionserver
*/
- public Integer doPrepareX(final byte[] regionName, final long transactionId, final TransactionRegionLocation location)
+ public Integer doPrepareX(final byte[] regionName, final long transactionId, final int participantNum, final TransactionRegionLocation location)
throws IOException, CommitUnsuccessfulException {
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- ENTRY txid: " + transactionId );
int commitStatus = 0;
@@ -613,6 +621,7 @@ public class TransactionManager {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest.Builder builder = CommitRequestRequest.newBuilder();
builder.setTransactionId(transactionId);
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName)));
+ builder.setParticipantNum(participantNum);
instance.commitRequest(controller, builder.build(), rpcCallback);
return rpcCallback.get();
@@ -708,11 +717,13 @@ public class TransactionManager {
}
}
catch(UnknownTransactionException ute) {
- LOG.warn("doPrepareX Exception: " + ute);
+ LOG.warn("doPrepareX participant " + participantNum + " transaction "
+ + transactionId + " unknown transaction : " + ute);
throw new UnknownTransactionException();
}
catch(Exception e) {
- LOG.error("doPrepareX retrying due to Exception: " + e);
+ LOG.error("doPrepareX participant " + participantNum + " retrying transaction "
+ + transactionId + " due to Exception: " + e);
refresh = true;
retry = true;
}
@@ -807,11 +818,13 @@ public class TransactionManager {
}
}
catch(UnknownTransactionException ute) {
- LOG.warn("doPrepareX Exception: " + ute);
+ LOG.warn("doPrepareX participant " + participantNum + " transaction "
+ + transactionId + " unknown transaction: " + ute);
throw new UnknownTransactionException();
}
catch(Exception e) {
- LOG.error("doPrepareX retrying due to Exception: " + e);
+ LOG.error("doPrepareX participant " + participantNum + " retrying transaction "
+ + transactionId + " due to Exception: " + e);
refresh = true;
retry = true;
}
@@ -907,10 +920,11 @@ public class TransactionManager {
* Return : Ignored
* Purpose : Call abort for a given regionserver
*/
- public Integer doAbortX(final byte[] regionName, final long transactionId) throws IOException{
- if(LOG.isTraceEnabled()) LOG.trace("doAbortX -- ENTRY txID: " + transactionId);
+ public Integer doAbortX(final byte[] regionName, final long transactionId, final int participantNum) throws IOException{
+ if(LOG.isDebugEnabled()) LOG.debug("doAbortX -- ENTRY txID: " + transactionId + " participantNum "
+ + participantNum + " region " + regionName.toString());
boolean retry = false;
- boolean refresh = false;
+ boolean refresh = false;
int retryCount = 0;
int retrySleep = TM_SLEEP;
@@ -928,8 +942,8 @@ public class TransactionManager {
public AbortTransactionResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionRequest.Builder builder = AbortTransactionRequest.newBuilder();
builder.setTransactionId(transactionId);
+ builder.setParticipantNum(participantNum);
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName)));
-
instance.abortTransaction(controller, builder.build(), rpcCallback);
return rpcCallback.get();
}
@@ -937,9 +951,10 @@ public class TransactionManager {
Map<byte[], AbortTransactionResponse> result = null;
try {
- if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- before coprocessorService txid: " + transactionId + " table: " +
- table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
- result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
+ if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- before coprocessorService txid: "
+ + transactionId + " table: " + table.toString() + " startKey: "
+ + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
+ result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
} catch (Throwable e) {
String msg = "ERROR occurred while calling doAbortX coprocessor service";
LOG.error(msg + ":" + e);
@@ -947,15 +962,18 @@ public class TransactionManager {
}
if(result.size() == 0) {
- LOG.error("doAbortX, received 0 region results.");
- refresh = true;
- retry = true;
+ LOG.error("doAbortX, received 0 region results for transaction: " + transactionId
+ + " participantNum: " + participantNum + " region: " + Bytes.toString(regionName));
+ refresh = true;
+ retry = true;
}
else {
for (AbortTransactionResponse cresponse : result.values()) {
if(cresponse.getHasException()) {
String exceptionString = cresponse.getException().toString();
- LOG.error("Abort HasException true: " + exceptionString);
+ LOG.error("Abort of transaction: " + transactionId
+ + " participantNum: " + participantNum + " region: " + Bytes.toString(regionName)
+ + " threw Exception: " + exceptionString);
if(exceptionString.contains("UnknownTransactionException")) {
throw new UnknownTransactionException();
}
@@ -966,16 +984,18 @@ public class TransactionManager {
}
}
catch (UnknownTransactionException ute) {
- LOG.debug("UnknownTransactionException in doAbortX for transaction: " + transactionId + "(ignoring): " + ute);
- }
+ LOG.error("UnknownTransactionException in doAbortX for transaction: " + transactionId
+ + " participantNum: " + participantNum + " region: "
+ + Bytes.toString(regionName) + "(ignoring): " + ute ); }
catch (Exception e) {
- if(e.toString().contains("Asked to commit a non-pending transaction")) {
- LOG.error("doCommitX will not retry: " + e);
+ if(e.toString().contains("Asked to commit a non-pending transaction ")) {
+ LOG.error(" doCommitX will not retry transaction: " + transactionId + " : " + e);
refresh = false;
retry = false;
}
else {
- LOG.error("doAbortX retrying due to Exception: " + e );
+ LOG.error("doAbortX retrying transaction: " + transactionId + " participantNum: "
+ + participantNum + " region: " + Bytes.toString(regionName) + " due to Exception: " + e );
refresh = true;
retry = true;
}
@@ -1058,7 +1078,7 @@ public class TransactionManager {
for (SsccAbortTransactionResponse cresponse : result.values()) {
if(cresponse.getHasException()) {
String exceptionString = cresponse.getException().toString();
- LOG.error("Abort HasException true: " + exceptionString);
+ LOG.error("Abort of transaction: " + transactionId + " threw Exception: " + exceptionString);
if(exceptionString.contains("UnknownTransactionException")) {
throw new UnknownTransactionException();
}
@@ -1069,10 +1089,12 @@ public class TransactionManager {
}
}
catch (UnknownTransactionException ute) {
- LOG.debug("UnknownTransactionException in doAbortX for transaction: " + transactionId + "(ignoring): " + ute);
+ LOG.debug("UnknownTransactionException in doAbortX by participant " + participantNum +
+ " for transaction: " + transactionId + "(ignoring): " + ute);
}
catch (Exception e) {
- LOG.error("doAbortX retrying due to Exception: " + e );
+ LOG.error("doAbortX participant " + participantNum + " retrying transaction "
+ + transactionId + " due to Exception: " + e);
refresh = true;
retry = true;
}
@@ -1124,7 +1146,8 @@ public class TransactionManager {
return 0;
}
- public Integer doCommitX(final List<TransactionRegionLocation> locations, final long transactionId, final long commitId, final boolean ignoreUnknownTransactionException) throws CommitUnsuccessfulException, IOException {
+ public Integer doCommitX(final List<TransactionRegionLocation> locations, final long transactionId,
+ final long commitId, final int participantNum, final boolean ignoreUnknownTransaction) throws CommitUnsuccessfulException, IOException {
boolean retry = false;
boolean refresh = false;
@@ -1132,15 +1155,16 @@ public class TransactionManager {
do {
try {
- if (LOG.isTraceEnabled()) LOG.trace("doCommitX - Batch -- ENTRY txid: " + transactionId +
- " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
+ if (LOG.isTraceEnabled()) LOG.trace("doCommitX - Batch -- ENTRY txid: " + transactionId
+ + " participant " + participantNum + " ignoreUnknownTransaction: " + ignoreUnknownTransaction);
TrxRegionProtos.CommitMultipleRequest.Builder builder = CommitMultipleRequest.newBuilder();
builder.setTransactionId(transactionId);
+ builder.setParticipantNum(participantNum);
for(TransactionRegionLocation location : locations) {
builder.addRegionName(ByteString.copyFrom(location.getRegionInfo().getRegionName()));
}
- builder.setIgnoreUnknownTransactionException(ignoreUnknownTransactionException);
+ builder.setIgnoreUnknownTransactionException(ignoreUnknownTransaction);
CommitMultipleRequest commitMultipleRequest = builder.build();
CommitMultipleResponse commitMultipleResponse = null;
@@ -1170,7 +1194,7 @@ public class TransactionManager {
throw new UnknownTransactionException(errMsg);
}
catch (Exception e) {
- LOG.error("doCommitX retrying due to Exception: " + e);
+ LOG.error("doCommitX retrying transaction " + transactionId + " due to Exception: " + e);
refresh = true;
retry = true;
}
@@ -1206,9 +1230,10 @@ public class TransactionManager {
return 0;
}
- public Integer doPrepareX(final List<TransactionRegionLocation> locations, final long transactionId)
+ public Integer doPrepareX(final List<TransactionRegionLocation> locations, final long transactionId, final int participantNum)
throws IOException, CommitUnsuccessfulException {
- if (LOG.isTraceEnabled()) LOG.trace("doPrepareX - Batch -- ENTRY txid: " + transactionId );
+ if (LOG.isTraceEnabled()) LOG.trace("doPrepareX - Batch -- ENTRY txid: " + transactionId
+ + " participant " + participantNum );
boolean refresh = false;
boolean retry = false;
@@ -1219,6 +1244,7 @@ public class TransactionManager {
TrxRegionProtos.CommitRequestMultipleRequest.Builder builder = CommitRequestMultipleRequest.newBuilder();
builder.setTransactionId(transactionId);
+ builder.setParticipantNum(participantNum);
for(TransactionRegionLocation location : locations) {
builder.addRegionName(ByteString.copyFrom(location.getRegionInfo().getRegionName()));
}
@@ -1245,12 +1271,14 @@ public class TransactionManager {
}
}
catch(UnknownTransactionException ute) {
- String warnMsg = "doPrepareX Exception: " + ute;
+ String warnMsg = new String("UnknownTransaction in doPrepareX - Batch - by participant "
+ + participantNum + " for transaction " + transactionId + " " + ute);
LOG.warn(warnMsg);
throw new UnknownTransactionException(warnMsg);
}
catch(Exception e) {
- LOG.error("doPrepareX retrying due to Exception: " + e);
+ LOG.error("doPrepareX - Batch - retrying for participant "
+ + participantNum + " transaction " + transactionId + " due to Exception: " + e);
refresh = true;
retry = true;
}
@@ -1258,7 +1286,7 @@ public class TransactionManager {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -Batch- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
if(retryCount == RETRY_ATTEMPTS){
LOG.error("Exceeded retry attempts in doPrepareX: " + retryCount);
@@ -1269,14 +1297,14 @@ public class TransactionManager {
throw new CommitUnsuccessfulException("Exceeded retry attempts in doPrepareX: " + retryCount);
}
if (LOG.isWarnEnabled()) {
- LOG.warn("doPrepareX -- " + table.toString() + " location being refreshed");
- LOG.warn("doPrepareX -- lv_hri: " + lv_hri);
- LOG.warn("doPrepareX -- location.getRegionInfo(): " + location.getRegionInfo());
+ LOG.warn("doPrepareX -Batch- " + table.toString() + " location being refreshed");
+ LOG.warn("doPrepareX -Batch- lv_hri: " + lv_hri);
+ LOG.warn("doPrepareX -Batch- location.getRegionInfo(): " + location.getRegionInfo());
}
table.getRegionLocation(startKey, true);
- if (LOG.isDebugEnabled()) LOG.debug("doPrepareX retry count: " + retryCount);
- if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- setting retry, count: " + retryCount);
+ if (LOG.isDebugEnabled()) LOG.debug("doPrepareX -Batch- retry count: " + retryCount);
+ if (LOG.isTraceEnabled()) LOG.trace("doPrepareX --Batch-- setting retry, count: " + retryCount);
refresh = false;
retryCount++;
}
@@ -1342,7 +1370,7 @@ public class TransactionManager {
return TM_COMMIT_TRUE;
}
- public Integer doAbortX(final List<TransactionRegionLocation> locations, final long transactionId) throws IOException{
+ public Integer doAbortX(final List<TransactionRegionLocation> locations, final long transactionId, final int participantNum) throws IOException{
if(LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch -- ENTRY txID: " + transactionId);
boolean retry = false;
boolean refresh = false;
@@ -1351,6 +1379,7 @@ public class TransactionManager {
try {
TrxRegionProtos.AbortTransactionMultipleRequest.Builder builder = AbortTransactionMultipleRequest.newBuilder();
builder.setTransactionId(transactionId);
+ builder.setParticipantNum(participantNum);
for(TransactionRegionLocation location : locations) {
builder.addRegionName(ByteString.copyFrom(location.getRegionInfo().getRegionName()));
}
@@ -1362,7 +1391,7 @@ public class TransactionManager {
abortTransactionMultipleResponse = trxService.abortTransactionMultiple(null, abortTransactionMultipleRequest);
retry = false;
} catch (Throwable e) {
- LOG.error("doAbortX coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":" + e);
+ LOG.error("doAbortX - Batch - coprocessor error for " + Bytes.toString(locations.iterator().next().getRegionInfo().getRegionName()) + " txid: " + transactionId + ":" + e);
refresh = true;
retry = true;
}
@@ -1375,10 +1404,12 @@ public class TransactionManager {
}
}
catch (UnknownTransactionException ute) {
- LOG.debug("UnknownTransactionException in doAbortX for transaction: " + transactionId + "(ignoring): " + ute);
+ LOG.debug("UnknownTransactionException in doAbortX - Batch - by participant " + participantNum
+ + " for transaction: " + transactionId + "(ignoring): " + ute);
}
catch (Exception e) {
- LOG.error("doAbortX retrying due to Exception: " + e );
+ LOG.error("doAbortX - Batch - participant " + participantNum + " retrying transaction "
+ + transactionId + " due to Exception: " + e);
refresh = true;
retry = true;
}
@@ -1386,20 +1417,22 @@ public class TransactionManager {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
- + Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
+ if (LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch - participant " + participantNum
+ + "-- location being refreshed : " + location.getRegionInfo().getRegionNameAsString()
+ + " endKey: " + Hex.encodeHexString(location.getRegionInfo().getEndKey())
+ + " for transaction: " + transactionId);
if(retryCount == RETRY_ATTEMPTS){
LOG.error("Exceeded retry attempts in doAbortX: " + retryCount + " (ingoring)");
}
if (LOG.isWarnEnabled()) {
- LOG.warn("doAbortX -- " + table.toString() + " location being refreshed");
- LOG.warn("doAbortX -- lv_hri: " + lv_hri);
- LOG.warn("doAbortX -- location.getRegionInfo(): " + location.getRegionInfo());
+ LOG.warn("doAbortX - Batch - -- " + table.toString() + " location being refreshed");
+ LOG.warn("doAbortX - Batch - -- lv_hri: " + lv_hri);
+ LOG.warn("doAbortX - Batch - -- location.getRegionInfo(): " + location.getRegionInfo());
}
table.getRegionLocation(startKey, true);
- if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- setting retry, count: " + retryCount);
+ if (LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch - -- setting retry, count: " + retryCount);
refresh = false;
retryCount++;
}
@@ -1601,9 +1634,10 @@ public class TransactionManager {
for(final Map.Entry<ServerName, List<TransactionRegionLocation>> entry : locations.entrySet()) {
loopCount++;
+ final int lv_participant = loopCount;
compPool.submit(new TransactionManagerCallable(transactionState, entry.getValue().iterator().next(), connection) {
public Integer call() throws CommitUnsuccessfulException, IOException {
- return doPrepareX(entry.getValue(), transactionState.getTransactionId());
+ return doPrepareX(entry.getValue(), transactionState.getTransactionId(), lv_participant);
}
});
}
@@ -1639,11 +1673,12 @@ public class TransactionManager {
if(transactionState.getRegionsRetryCount() > 0) {
for (TransactionRegionLocation location : transactionState.getRetryRegions()) {
loopCount++;
+ final int lvParticipantNum = loopCount;
compPool.submit(new TransactionManagerCallable(transactionState, location, connection) {
public Integer call() throws CommitUnsuccessfulException, IOException {
return doPrepareX(location.getRegionInfo().getRegionName(),
- transactionState.getTransactionId(),
+ transactionState.getTransactionId(), lvParticipantNum,
location);
}
});
@@ -1716,10 +1751,11 @@ public class TransactionManager {
loopCount++;
final TransactionRegionLocation myLocation = location;
final byte[] regionName = location.getRegionInfo().getRegionName();
+ final int lvParticipantNum = loopCount;
compPool.submit(new TransactionManagerCallable(transactionState, location, connection) {
public Integer call() throws IOException, CommitUnsuccessfulException {
- return doPrepareX(regionName, transactionState.getTransactionId(), myLocation);
+ return doPrepareX(regionName, transactionState.getTransactionId(), lvParticipantNum, myLocation);
}
});
}
@@ -1749,7 +1785,8 @@ public class TransactionManager {
}
} catch (Exception e) {
- LOG.error("exception in prepareCommit (during submit to pool): " + e);
+ LOG.error("exception in prepareCommit for transaction: "
+ + transactionState.getTransactionId() + " (during submit to pool): " + e);
throw new CommitUnsuccessfulException(e);
}
@@ -1778,8 +1815,9 @@ public class TransactionManager {
}
}
}catch (Exception e) {
- LOG.error("exception in prepareCommit (during completion processing): " + e);
- throw new CommitUnsuccessfulException(e);
+ LOG.error("exception in prepareCommit for transaction: "
+ + transactionState.getTransactionId() + " (during completion processing): " + e);
+ throw new CommitUnsuccessfulException(e);
}
if(commitError != 0)
return commitError;
@@ -1871,19 +1909,26 @@ public class TransactionManager {
+ ((EnvironmentEdgeManager.currentTimeMillis() - startTime)) + "]ms");
}
- public void retryCommit(final TransactionState transactionState, final boolean ignoreUnknownTransactionException) {
+ public void retryCommit(final TransactionState transactionState, final boolean ignoreUnknownTransaction) {
if(LOG.isTraceEnabled()) LOG.trace("retryCommit -- ENTRY -- txid: " + transactionState.getTransactionId());
synchronized(transactionState.getRetryRegions()) {
List<TransactionRegionLocation> completedList = new ArrayList<TransactionRegionLocation>();
final long commitIdVal = (TRANSACTION_ALGORITHM == AlgorithmType.SSCC) ? transactionState.getCommitId() : -1;
+ int loopCount = 0;
for (TransactionRegionLocation location : transactionState.getRetryRegions()) {
- if(LOG.isTraceEnabled()) LOG.trace("retryAbort retrying abort for: " + location.getRegionInfo().getRegionNameAsString());
+ loopCount++;
+ final int participantNum = loopCount;
+ if(LOG.isTraceEnabled()) LOG.trace("retryCommit retrying commit for transaction: "
+ + transactionState.getTransactionId() + ", participant: " + participantNum + ", region "
+ + location.getRegionInfo().getRegionNameAsString());
threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) {
public Integer call() throws CommitUnsuccessfulException, IOException {
return doCommitX(location.getRegionInfo().getRegionName(),
- transactionState.getTransactionId(), commitIdVal,
- ignoreUnknownTransactionException);
+ transactionState.getTransactionId(),
+ commitIdVal,
+ participantNum,
+ ignoreUnknownTransaction);
}
});
completedList.add(location);
@@ -1897,13 +1942,18 @@ public class TransactionManager {
if(LOG.isTraceEnabled()) LOG.trace("retryAbort -- ENTRY -- txid: " + transactionState.getTransactionId());
synchronized(transactionState.getRetryRegions()) {
List<TransactionRegionLocation> completedList = new ArrayList<TransactionRegionLocation>();
+ int loopCount = 0;
for (TransactionRegionLocation location : transactionState.getRetryRegions()) {
- if(LOG.isTraceEnabled()) LOG.trace("retryAbort retrying abort for: " + location.getRegionInfo().getRegionNameAsString());
+ loopCount++;
+ final int participantNum = loopCount;
+ if(LOG.isTraceEnabled()) LOG.trace("retryAbort retrying abort for transaction: "
+ + transactionState.getTransactionId() + ", participant: "
+ + participantNum + ", region: " + location.getRegionInfo().getRegionNameAsString());
threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) {
public Integer call() throws CommitUnsuccessfulException, IOException {
return doAbortX(location.getRegionInfo().getRegionName(),
- transactionState.getTransactionId());
+ transactionState.getTransactionId(), participantNum);
}
});
completedList.add(location);
@@ -1921,7 +1971,7 @@ public class TransactionManager {
public void doCommit(final TransactionState transactionState)
throws CommitUnsuccessfulException, UnsuccessfulDDLException {
if (LOG.isTraceEnabled()) LOG.trace("doCommit [" + transactionState.getTransactionId() +
- "] ignoreUnknownTransactionException not supplied");
+ "] ignoreUnknownTransaction not supplied");
doCommit(transactionState, false);
}
@@ -1929,16 +1979,16 @@ public class TransactionManager {
* Do the commit. This is the 2nd phase of the 2-phase protocol.
*
* @param transactionState
- * @param ignoreUnknownTransactionException
+ * @param ignoreUnknownTransaction
* @throws CommitUnsuccessfulException
*/
- public void doCommit(final TransactionState transactionState, final boolean ignoreUnknownTransactionException)
+ public void doCommit(final TransactionState transactionState, final boolean ignoreUnknownTransaction)
throws CommitUnsuccessfulException, UnsuccessfulDDLException {
int loopCount = 0;
if (batchRegionServer && (TRANSACTION_ALGORITHM == AlgorithmType.MVCC)) {
try {
- if (LOG.isTraceEnabled()) LOG.trace("Committing [" + transactionState.getTransactionId() +
- "] ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
+ if (LOG.isTraceEnabled()) LOG.trace("Committing [" + transactionState.getTransactionId() +
+ "] ignoreUnknownTransaction: " + ignoreUnknownTransaction);
// Set the commitId
transactionState.setCommitId(-1); // Dummy for MVCC
@@ -1957,24 +2007,24 @@ public class TransactionManager {
}
else {
regionList = locations.get(servername);
- }
+ }
regionList.add(location);
- }
+ }
for(final Map.Entry<ServerName, List<TransactionRegionLocation>> entry : locations.entrySet()) {
if (LOG.isTraceEnabled()) LOG.trace("sending commits ... [" + transactionState.getTransactionId() + "]");
loopCount++;
+ final int lv_participant = loopCount;
threadPool.submit(new TransactionManagerCallable(transactionState, entry.getValue().iterator().next(), connection) {
public Integer call() throws CommitUnsuccessfulException, IOException {
if (LOG.isTraceEnabled()) LOG.trace("before doCommit() [" + transactionState.getTransactionId() + "]" +
- " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
+ " ignoreUnknownTransaction: " + ignoreUnknownTransaction);
return doCommitX(entry.getValue(), transactionState.getTransactionId(),
- transactionState.getCommitId(), ignoreUnknownTransactionException);
+ transactionState.getCommitId(), lv_participant, ignoreUnknownTransaction);
}
});
}
-
} catch (Exception e) {
LOG.error("exception in doCommit for transaction: " + transactionState.getTransactionId() + " " + e);
// This happens on a NSRE that is triggered by a split
@@ -2000,7 +2050,7 @@ public class TransactionManager {
// non batch-rs
if (LOG.isTraceEnabled()) LOG.trace("Committing [" + transactionState.getTransactionId() +
- "] ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
+ "] ignoreUnknownTransactionn: " + ignoreUnknownTransaction);
if (LOG.isTraceEnabled()) LOG.trace("sending commits for ts: " + transactionState);
try {
@@ -2015,28 +2065,26 @@ public class TransactionManager {
loopCount++;
final byte[] regionName = location.getRegionInfo().getRegionName();
-
+ final int participantNum = loopCount;
//TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
// .getHRegionConnection(location.getServerName());
threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) {
public Integer call() throws CommitUnsuccessfulException, IOException {
- if (LOG.isTraceEnabled()) LOG.trace("before doCommit() [" + transactionState.getTransactionId() + "]" +
- " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
- return doCommitX(regionName, transactionState.getTransactionId(), transactionState.getCommitId(), ignoreUnknownTransactionException);
+ if (LOG.isDebugEnabled()) LOG.debug("before doCommit() [" + transactionState.getTransactionId()
+ + "] participantNum " + participantNum + " ignoreUnknownTransaction: " + ignoreUnknownTransaction);
+ return doCommitX(regionName,
+ transactionState.getTransactionId(),
+ transactionState.getCommitId(),
+ participantNum,
+ ignoreUnknownTransaction);
}
});
}
} catch (Exception e) {
- LOG.error("exception in doCommit for transaction: " + transactionState.getTransactionId() + " " + e);
+ LOG.error("exception in doCommit for transaction: "
+ + transactionState.getTransactionId() + " " + e);
// This happens on a NSRE that is triggered by a split
-/* try {
- abort(transactionState);
- } catch (Exception abortException) {
-
- LOG.warn("Exeption during abort", abortException);
- }
-*/
throw new CommitUnsuccessfulException(e);
}
@@ -2212,12 +2260,6 @@ public class TransactionManager {
public void abort(final TransactionState transactionState) throws IOException, UnsuccessfulDDLException {
if(LOG.isTraceEnabled()) LOG.trace("Abort -- ENTRY txID: " + transactionState.getTransactionId());
int loopCount = 0;
- /*
- if(transactionState.getStatus().equals("ABORTED")) {
- if(LOG.isTraceEnabled()) LOG.trace("Abort --EXIT already called, ignoring");
- return;
- }
- */
transactionState.setStatus(TransState.STATE_ABORTED);
// (Asynchronously send aborts
@@ -2243,44 +2285,35 @@ public class TransactionManager {
}
for(final Map.Entry<ServerName, List<TransactionRegionLocation>> entry : locations.entrySet()) {
loopCount++;
+ final int participantNum = loopCount;
+
threadPool.submit(new TransactionManagerCallable(transactionState, entry.getValue().iterator().next(), connection) {
public Integer call() throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("before abort() [" + transactionState.getTransactionId() + "]");
- return doAbortX(entry.getValue(), transactionState.getTransactionId());
+ return doAbortX(entry.getValue(), transactionState.getTransactionId(), participantNum);
}
});
}
transactionState.completeSendInvoke(loopCount);
- /*
- if(transactionState.getRegionsRetryCount() > 0) {
- for (TransactionRegionLocation location : transactionState.getRetryRegions()) {
- loopCount++;
- threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) {
- public Integer call() throws CommitUnsuccessfulException, IOException {
-
- return doAbortX(location.getRegionInfo().getRegionName(),
- transactionState.getTransactionId());
- }
- });
- }
- }
- transactionState.clearRetryRegions();
- */
}
else {
-
+ loopCount = 0;
for (TransactionRegionLocation location : transactionState.getParticipatingRegions()) {
if (transactionState.getRegionsToIgnore().contains(location)) {
continue;
}
try {
loopCount++;
+ final int participantNum = loopCount;
final byte[] regionName = location.getRegionInfo().getRegionName();
+ if(LOG.isTraceEnabled()) LOG.trace("Submitting abort for transaction: "
+ + transactionState.getTransactionId() + ", participant: "
+ + participantNum + ", region: " + location.getRegionInfo().getRegionNameAsString());
threadPool.submit(new TransactionManagerCallable(transactionState, location, connection) {
public Integer call() throws IOException {
- return doAbortX(regionName, transactionState.getTransactionId());
+ return doAbortX(regionName, transactionState.getTransactionId(), participantNum);
}
});
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/84af9603/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
index 16153c3..9dd3b5a 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
@@ -451,8 +451,12 @@ CoprocessorService, Coprocessor {
try {
abortTransaction(transactionId);
} catch (UnknownTransactionException u) {
- if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + ", Caught UnknownTransactionException after internal abortTransaction call - " + u.getMessage() + " " + stackTraceToString(u));
- ute = u;
+ if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId "
+ + transactionId
+ + ", Caught UnknownTransactionException after internal abortTransaction call - "
+ + u.getMessage() + " "
+ + stackTraceToString(u));
+ ute = u;
} catch (IOException e) {
if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor:abort - txId " + transactionId + ", Caught IOException after internal abortTransaction call - " + e.getMessage() + " " + stackTraceToString(e));
ioe = e;
@@ -657,8 +661,10 @@ CoprocessorService, Coprocessor {
Throwable t = null;
WrongRegionException wre = null;
long transactionId = request.getTransactionId();
+ final int participantNum = request.getParticipantNum();
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commit - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString());
+ if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commit - txId "
+ + transactionId + ", participantNum " + participantNum + ", regionName " + regionInfo.getRegionNameAsString());
/* commenting out for the time being
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
@@ -676,7 +682,7 @@ CoprocessorService, Coprocessor {
{
// Process local memory
try {
- commit(transactionId, request.getIgnoreUnknownTransactionException());
+ commit(transactionId, participantNum, request.getIgnoreUnknownTransactionException());
} catch (Throwable e) {
LOG.error("TrxRegionEndpoint coprocessor: commit - txId " + transactionId + ", Caught exception after internal commit call "
+ e.getMessage() + " " + stackTraceToString(e));
@@ -737,7 +743,7 @@ CoprocessorService, Coprocessor {
commitMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString());
}
else {
- regionEPCP.commit(transactionId, request.getIgnoreUnknownTransactionException());
+ regionEPCP.commit(transactionId, i, request.getIgnoreUnknownTransactionException());
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitMultiple ends");
//commit(transactionId, request.getIgnoreUnknownTransactionException());
@@ -776,6 +782,7 @@ CoprocessorService, Coprocessor {
boolean reply = false;
long transactionId = request.getTransactionId();
+ final int participantNum = request.getParticipantNum();
Throwable t = null;
WrongRegionException wre = null;
@@ -795,8 +802,8 @@ CoprocessorService, Coprocessor {
{
// Process local memory
try {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", regionName, " + regionInfo.getRegionNameAsString() + "calling internal commitIfPossible");
- reply = commitIfPossible(transactionId);
+ if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", regionName, " + regionInfo.getRegionNameAsString() + "calling internal commitIfPossible");
+ reply = commitIfPossible(transactionId, participantNum);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", Caught exception after internal commitIfPossible call "
+ e.getMessage() + " " + stackTraceToString(e));
@@ -837,8 +844,10 @@ CoprocessorService, Coprocessor {
Throwable t = null;
WrongRegionException wre = null;
long transactionId = request.getTransactionId();
+ int participantNum = request.getParticipantNum();
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", regionName " + regionInfo.getRegionNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId "
+ + transactionId + ", participantNum " + participantNum + ", regionName " + regionInfo.getRegionNameAsString());
/* commenting out for the time being
java.lang.String name = ((com.google.protobuf.ByteString) request.getRegionName()).toStringUtf8();
@@ -856,7 +865,7 @@ CoprocessorService, Coprocessor {
{
// Process local memory
try {
- status = commitRequest(transactionId);
+ status = commitRequest(transactionId, participantNum);
} catch (UnknownTransactionException u) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString());
ute = u;
@@ -957,8 +966,8 @@ CoprocessorService, Coprocessor {
commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString());
}
else {
- if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, true);} // only the last region flush
- else {status = regionEPCP.commitRequest(transactionId, false);}
+ if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, i, true);} // only the last region flush
+ else {status = regionEPCP.commitRequest(transactionId, i, false);}
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitRequestMultiple ends");
//status = commitRequest(transactionId);
@@ -4225,8 +4234,8 @@ CoprocessorService, Coprocessor {
* @param long TransactionId
* @throws IOException
*/
- public void commit(final long transactionId) throws IOException {
- commit(transactionId, false /* IgnoreUnknownTransactionException */);
+ public void commit(final long transactionId, final int participantNum) throws IOException {
+ commit(transactionId, participantNum, false /* IgnoreUnknownTransactionException */);
}
/**
@@ -4235,9 +4244,9 @@ CoprocessorService, Coprocessor {
* @param boolean ignoreUnknownTransactionException
* @throws IOException
*/
- public void commit(final long transactionId, final boolean ignoreUnknownTransactionException) throws IOException {
+ public void commit(final long transactionId, final int participantNum, final boolean ignoreUnknownTransactionException) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commit(txId) -- ENTRY txId: " + transactionId +
- " ignoreUnknownTransactionException: " + ignoreUnknownTransactionException);
+ " ignoreUnknownTransaction: " + ignoreUnknownTransactionException);
CommitProgress commitStatus = CommitProgress.NONE;
TrxTransactionState state;
try {
@@ -4249,10 +4258,11 @@ CoprocessorService, Coprocessor {
+ m_Region.getRegionInfo().getRegionNameAsString());
return;
}
- LOG.fatal("TrxRegionEndpoint coprocessor: Asked to commit unknown transaction: " + transactionId
- + " in region "
- + m_Region.getRegionInfo().getRegionNameAsString());
- throw new IOException("UnknownTransactionException, transId: " + transactionId);
+ LOG.fatal("TrxRegionEndpoint coprocessor: Participant " + participantNum
+ + " Asked to commit unknown transaction: " + transactionId
+ + " in region " + m_Region.getRegionInfo().getRegionNameAsString());
+ throw new IOException("UnknownTransactionException, Participant "
+ + participantNum + " transId: " + transactionId);
}
if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
@@ -4296,13 +4306,15 @@ CoprocessorService, Coprocessor {
* @return TransactionRegionInterface commit code
* @throws IOException
*/
- public int commitRequest(final long transactionId) throws IOException {
- return commitRequest(transactionId, true);
+ public int commitRequest(final long transactionId, final int participantNum) throws IOException {
+ return commitRequest(transactionId, participantNum, true);
}
- public int commitRequest(final long transactionId, boolean flushHLOG) throws IOException {
+ public int commitRequest(final long transactionId, final int participantNum, boolean flushHLOG) throws IOException,
+ UnknownTransactionException {
long txid = 0;
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest -- ENTRY txId: " + transactionId);
+ if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest -- ENTRY txId: "
+ + transactionId + " participantNum " + participantNum);
TrxTransactionState state;
int lv_totalCommits = 0;
@@ -4326,10 +4338,10 @@ CoprocessorService, Coprocessor {
try {
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
- if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest Unknown transaction [" + transactionId
- + "] in region ["
+ if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest Unknown transaction ["
+ + transactionId + "] in region ["
+ m_Region.getRegionInfo().getRegionNameAsString()
- + "], ignoring");
+ + "], participantNum " + participantNum + " ignoring");
state = null;
}
// may change to indicate a NOTFOUND case then depends on the TM ts state, if reinstated tx, ignore the exception
@@ -4693,18 +4705,18 @@ CoprocessorService, Coprocessor {
* @return boolean
* @throws IOException
*/
- public boolean commitIfPossible(final long transactionId)
+ public boolean commitIfPossible(final long transactionId, final int participantNum)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: "
+ transactionId);
- int status = commitRequest(transactionId);
+ int status = commitRequest(transactionId, participantNum);
if (status == COMMIT_OK) {
// Process local memory
try {
- commit(transactionId);
+ commit(transactionId, participantNum);
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: " + transactionId + " COMMIT_OK");
return true;
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/84af9603/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
index 8b40e88..71a8a49 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
@@ -1516,15 +1516,15 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
return true;
@@ -2701,15 +2701,15 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasRegionName()) {
-
+
return false;
}
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasCommitId()) {
-
+
return false;
}
return true;
@@ -6428,23 +6428,23 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
if (!hasRow()) {
-
+
return false;
}
if (!hasFamily()) {
-
+
return false;
}
if (!hasQualifier()) {
@@ -8230,39 +8230,39 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
if (!hasRow()) {
-
+
return false;
}
if (!hasFamily()) {
-
- return false;
+
+ return false;
}
if (!hasQualifier()) {
-
+
return false;
}
if (!hasValue()) {
-
+
return false;
}
if (!hasPut()) {
-
+
return false;
}
if (!getPut().isInitialized()) {
-
+
return false;
}
return true;
@@ -11006,20 +11006,20 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
for (int i = 0; i < getDeleteCount(); i++) {
if (!getDelete(i).isInitialized()) {
-
+
return false;
}
}
@@ -12745,22 +12745,23 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
if (!hasDelete()) {
-
+
return false;
}
if (!getDelete().isInitialized()) {
+
return false;
}
return true;
@@ -14362,23 +14363,23 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
if (!hasGet()) {
-
+
return false;
}
if (!getGet().isInitialized()) {
-
+
return false;
}
return true;
@@ -15899,23 +15900,23 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
if (!hasScan()) {
-
+
return false;
}
if (!getScan().isInitialized()) {
-
+
return false;
}
return true;
@@ -17458,31 +17459,31 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
if (!hasScannerId()) {
-
+
return false;
}
if (!hasNumberOfRows()) {
-
+
return false;
}
if (!hasCloseScanner()) {
-
+
return false;
}
if (!hasNextCallSeq()) {
-
+
return false;
}
return true;
@@ -19503,23 +19504,23 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
if (!hasPut()) {
-
+
return false;
}
if (!getPut().isInitialized()) {
-
+
return false;
}
return true;
@@ -21247,20 +21248,20 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
for (int i = 0; i < getPutCount(); i++) {
if (!getPut(i).isInitialized()) {
-
+
return false;
}
}
@@ -22988,19 +22989,19 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasRegionName()) {
-
+
return false;
}
if (!hasTmId()) {
-
+
return false;
}
return true;
@@ -24568,27 +24569,27 @@ public final class SsccRegionProtos {
public final boolean isInitialized() {
if (!hasRegionName()) {
-
+
return false;
}
if (!hasTransactionId()) {
-
+
return false;
}
if (!hasStartId()) {
-
+
return false;
}
if (!hasInterpreterClassName()) {
-
+
return false;
}
if (!hasScan()) {
-
+
return false;
}
if (!getScan().isInitialized()) {
-
+
return false;
}
return true;