You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2016/02/02 18:07:16 UTC
[3/5] incubator-trafodion git commit: Fix for [TRAFODION-1703] Lower
overhead in deleting old Tlog entries
Fix for [TRAFODION-1703]
Lower overhead in deleting old Tlog entries
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/7f54aa7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/7f54aa7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/7f54aa7c
Branch: refs/heads/master
Commit: 7f54aa7cddc5c57d71d3aa5a59ce2046b67865cd
Parents: 02c6c32
Author: Sean Broeder <sb...@edev03.esgyn.local>
Authored: Tue Jan 26 13:40:38 2016 +0000
Committer: Sean Broeder <sb...@edev03.esgyn.local>
Committed: Wed Jan 27 03:57:25 2016 +0000
----------------------------------------------------------------------
.../transactional/TrxRegionEndpoint.java | 208 +-
.../generated/TrxRegionProtos.java | 2182 +++++++++++++++++-
.../hbase-trx/src/main/protobuf/TrxRegion.proto | 17 +
.../java/org/trafodion/dtm/TmAuditTlog.java | 350 ++-
4 files changed, 2606 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7f54aa7c/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
index 804afdf..e8e6038 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
@@ -61,6 +61,7 @@ import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
+import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
@@ -207,6 +208,8 @@ import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProt
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
@@ -2145,6 +2148,206 @@ CoprocessorService, Coprocessor {
done.run(presponse);
}
+ public void deleteTlogEntries(RpcController controller,
+ TlogDeleteRequest request, RpcCallback<TlogDeleteResponse> done) {
+ boolean hasMore = true;
+ RegionScanner scanner = null;
+ Throwable t = null;
+ ScannerTimeoutException ste = null;
+ OutOfOrderProtocolException oop = null;
+ OutOfOrderScannerNextException ooo = null;
+ UnknownScannerException use = null;
+ MemoryUsageException mue = null;
+ WrongRegionException wre = null;
+ Exception ne = null;
+ Scan scan = null;
+ List<Cell> cellResults = new ArrayList<Cell>();
+ org.apache.hadoop.hbase.client.Result result = null;
+ long transId = request.getTransactionId();
+ long lvAsn = request.getAuditSeqNum();
+ boolean lvAgeCommitted = request.getAgeCommitted();
+ try{
+ scan = ProtobufUtil.toScan(request.getScan());
+ prepareScanner(scan);
+ scanner = getScanner(transId, scan);
+ }
+ catch (Exception e){
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Exception in region: "
+ + regionInfo.getRegionNameAsString() + " getting scanner " + e );
+ }
+
+ long count = 0L;
+ boolean shouldContinue = true;
+
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries ENTRY. Records older than " + lvAsn
+ + " will be deleted in region: " + regionInfo.getRegionNameAsString());
+
+ // There should be a matching key in the transactionsById map
+ // associated with this transaction id. If there is not
+ // one, then the initial openScanner call for the transaction
+ // id was not called. This is a protocol error requiring
+ // openScanner, performScan followed by a closeScanner.
+
+ String key = getTransactionalUniqueId(transId);
+ boolean keyFound = transactionsById.containsKey(key);
+
+ if (keyFound != true){
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - Unknown transaction ["
+ + transId + "] in region [" + m_Region.getRegionInfo().getRegionNameAsString()
+ + "], will create an OutOfOrderProtocol exception ");
+ oop = new OutOfOrderProtocolException("deleteTlogEntries does not have an active transaction with an open scanner, txId: " + transId);
+ }
+
+ if (oop == null) {
+ try {
+
+ if (scanner != null){
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+ + transId + ", scanner is not null");
+ while (shouldContinue) {
+ hasMore = scanner.next(cellResults);
+ result = Result.create(cellResults);
+ if (!result.isEmpty()) {
+ for (Cell cell : result.rawCells()) {
+ String valueString = new String(CellUtil.cloneValue(cell));
+ StringTokenizer st = new StringTokenizer(valueString, ",");
+ if (st.hasMoreElements()) {
+ String asnToken = st.nextElement().toString();
+ String transidToken = st.nextElement().toString();
+ String stateToken = st.nextElement().toString();
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries transidToken: "
+ + transidToken + " asnToken: " + asnToken);
+ if (Long.parseLong(asnToken) < lvAsn) {
+ if ( (stateToken.contains("FORGOTTEN")) ||
+ (stateToken.equals("COMMITTED") && (lvAgeCommitted)) ||
+ (stateToken.equals("ABORTED") && (lvAgeCommitted))) {
+
+ if (LOG.isTraceEnabled()) LOG.trace("Deleting transid: " + transidToken
+ + " from region: " + m_Region.getRegionInfo().getRegionNameAsString() + " with state: " + stateToken);
+
+ try {
+ Delete d = new Delete(result.getRow());
+ m_Region.delete(d);
+ }
+ catch (Exception e) {
+ LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries -"
+ + " txId " + transidToken + ", Executing delete caught an exception " + e);
+ throw new IOException(e.toString());
+ }
+ count++;
+ }
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Ending scan at asn: " + asnToken
+ + ", transid: " + transidToken +
+ " because it is not less than the comparator: " + lvAsn +
+ " in region: " + m_Region.getRegionInfo().getRegionNameAsString());
+ shouldContinue = false;
+ break;
+ }
+ } // if (st.hasMoreElements()
+ } // for (Cell cell : result.rawCells()
+ } // if (!result.isEmpty()
+ cellResults.clear();
+
+ if (!hasMore){
+ shouldContinue = false;
+ }
+ } // while (shouldContinue)
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+ + transId + ", count is " + count + ", hasMore is " + hasMore
+ + ", result " + result.isEmpty());
+ }
+ else {
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+ + transId + ", scanner is null");
+ }
+ } catch(OutOfOrderScannerNextException ooone) {
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+ + transId + " Caught OutOfOrderScannerNextException "
+ + ooone.getMessage() + " " + stackTraceToString(ooone));
+ ooo = ooone;
+ } catch(ScannerTimeoutException cste) {
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+ + transId + " Caught ScannerTimeoutException "
+ + cste.getMessage() + " " + stackTraceToString(cste));
+ ste = cste;
+ } catch(Throwable e) {
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+ + transId + " Caught throwable exception "
+ + e.getMessage() + " " + stackTraceToString(e));
+ t = e;
+ }
+ if (scanner != null) {
+ try {
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+ + transId + ", closing the scanner, region is " + regionInfo.getRegionNameAsString());
+ scanner.close();
+ } catch(Exception e) {
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - transaction id "
+ + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e));
+ ne = e;
+ }
+ }
+ }
+
+ org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse.Builder deleteResponseBuilder = TlogDeleteResponse.newBuilder();
+ deleteResponseBuilder.setCount(count);
+ deleteResponseBuilder.setHasException(false);
+
+ if (t != null){
+ deleteResponseBuilder.setHasException(true);
+ deleteResponseBuilder.setException(t.toString());
+ }
+
+ if (ste != null){
+ deleteResponseBuilder.setHasException(true);
+ deleteResponseBuilder.setException(ste.toString());
+ }
+
+ if (wre != null){
+ deleteResponseBuilder.setHasException(true);
+ deleteResponseBuilder.setException(wre.toString());
+ }
+
+ if (ne != null){
+ deleteResponseBuilder.setHasException(true);
+ deleteResponseBuilder.setException(ne.toString());
+ }
+
+ if (ooo != null){
+ deleteResponseBuilder.setHasException(true);
+ deleteResponseBuilder.setException(ooo.toString());
+ }
+
+ if (use != null){
+ deleteResponseBuilder.setHasException(true);
+ deleteResponseBuilder.setException(use.toString());
+ }
+
+ if (oop != null){
+ if (this.suppressOutOfOrderProtocolException == false){
+ deleteResponseBuilder.setHasException(true);
+ deleteResponseBuilder.setException(oop.toString());
+ LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries - OutOfOrderProtocolException, transaction was not found, txId: "
+ + transId + ", return exception" + ", regionName " + regionInfo.getRegionNameAsString());
+ }
+ else{
+ LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries - suppressing OutOfOrderProtocolException, transaction was not found, txId: "
+ + transId + ", regionName " + regionInfo.getRegionNameAsString());
+ }
+ }
+
+ if (mue != null){
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - performing memoryPercentage "
+ + memoryPercentage + ", posting memory usage exceeds indicated percentage");
+ deleteResponseBuilder.setHasException(true);
+ deleteResponseBuilder.setException(mue.toString());
+ }
+
+ TlogDeleteResponse TlogDel_response = deleteResponseBuilder.build();
+ done.run(TlogDel_response);
+ }
+
@Override
public void put(RpcController controller,
PutTransactionalRequest request,
@@ -4607,7 +4810,7 @@ CoprocessorService, Coprocessor {
state.setSequenceNumber(nextSequenceId.getAndIncrement());
commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(), state);
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: Transaction " + transactionId
- + " found in region " + m_Region.getRegionInfo().getRegionNameAsString()
+ + " found in region " + lv_regionName
+ ". Adding to commitedTransactionsBySequenceNumber for sequence number " + state.getSequenceNumber());
}
commitCheckEndTime = putBySequenceEndTime = System.nanoTime();
@@ -4767,7 +4970,8 @@ CoprocessorService, Coprocessor {
if(state.getSplitRetry())
return COMMIT_RESEND;
retireTransaction(state, true);
- if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest READ ONLY -- EXIT txId: " + transactionId);
+ if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest READ ONLY for participant "
+ + participantNum + " -- EXIT txId: " + transactionId + ", regionName " + regionInfo.getRegionNameAsString());
return COMMIT_OK_READ_ONLY;
}