You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2016/02/02 18:07:16 UTC

[3/5] incubator-trafodion git commit: Fix for [TRAFODION-1703] Lower overhead in deleting old Tlog entries

Fix for [TRAFODION-1703]
Lower overhead in deleting old Tlog entries


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/7f54aa7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/7f54aa7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/7f54aa7c

Branch: refs/heads/master
Commit: 7f54aa7cddc5c57d71d3aa5a59ce2046b67865cd
Parents: 02c6c32
Author: Sean Broeder <sb...@edev03.esgyn.local>
Authored: Tue Jan 26 13:40:38 2016 +0000
Committer: Sean Broeder <sb...@edev03.esgyn.local>
Committed: Wed Jan 27 03:57:25 2016 +0000

----------------------------------------------------------------------
 .../transactional/TrxRegionEndpoint.java        |  208 +-
 .../generated/TrxRegionProtos.java              | 2182 +++++++++++++++++-
 .../hbase-trx/src/main/protobuf/TrxRegion.proto |   17 +
 .../java/org/trafodion/dtm/TmAuditTlog.java     |  350 ++-
 4 files changed, 2606 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/7f54aa7c/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
index 804afdf..e8e6038 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
@@ -61,6 +61,7 @@ import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
+import java.util.StringTokenizer;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.UUID;
@@ -207,6 +208,8 @@ import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProt
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse;
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest;
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse;
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateRequest;
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionalAggregateResponse;
 import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
@@ -2145,6 +2148,206 @@ CoprocessorService, Coprocessor {
     done.run(presponse);
   }
 
+  public void deleteTlogEntries(RpcController controller,
+           TlogDeleteRequest request, RpcCallback<TlogDeleteResponse> done) {
+     boolean hasMore = true;
+     RegionScanner scanner = null;
+     Throwable t = null;
+     ScannerTimeoutException ste = null;
+     OutOfOrderProtocolException oop = null;
+     OutOfOrderScannerNextException ooo = null;
+     UnknownScannerException use = null;
+     MemoryUsageException mue = null;
+     WrongRegionException wre = null;
+     Exception ne = null;
+     Scan scan = null;
+     List<Cell> cellResults = new ArrayList<Cell>();
+     org.apache.hadoop.hbase.client.Result result = null;
+     long transId  = request.getTransactionId();
+     long lvAsn = request.getAuditSeqNum();
+     boolean lvAgeCommitted = request.getAgeCommitted();
+     try{
+        scan = ProtobufUtil.toScan(request.getScan());
+        prepareScanner(scan);
+        scanner = getScanner(transId, scan);
+     }
+     catch (Exception e){
+        if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Exception in region: "
+           + regionInfo.getRegionNameAsString() + " getting scanner " + e );
+     }
+
+     long count = 0L;
+     boolean shouldContinue = true;
+
+     if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries ENTRY.  Records older than " + lvAsn
+         + " will be deleted in region: " + regionInfo.getRegionNameAsString());
+
+     // There should be a matching key in the transactionsById map
+     // associated with this transaction id.  If there is not
+     // one, then the initial openScanner call for the transaction
+     // id was not called.  This is a protocol error requiring
+     // openScanner, performScan followed by a closeScanner.
+
+     String key = getTransactionalUniqueId(transId);
+     boolean keyFound = transactionsById.containsKey(key);
+
+     if (keyFound != true){
+        if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - Unknown transaction ["
+           + transId + "] in region [" + m_Region.getRegionInfo().getRegionNameAsString()
+           + "], will create an OutOfOrderProtocol exception ");
+        oop = new OutOfOrderProtocolException("deleteTlogEntries does not have an active transaction with an open scanner, txId: " + transId);
+     }
+
+     if (oop == null) {
+        try {
+
+           if (scanner != null){
+              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+                       + transId + ", scanner is not null");
+              while (shouldContinue) {
+                 hasMore = scanner.next(cellResults);
+                 result = Result.create(cellResults);
+                 if (!result.isEmpty()) {
+                    for (Cell cell : result.rawCells()) {
+                       String valueString = new String(CellUtil.cloneValue(cell));
+                       StringTokenizer st = new StringTokenizer(valueString, ",");
+                       if (st.hasMoreElements()) {
+                          String asnToken = st.nextElement().toString();
+                          String transidToken = st.nextElement().toString();
+                          String stateToken = st.nextElement().toString();
+                          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries transidToken: "
+                                    + transidToken + " asnToken: " + asnToken);
+                          if (Long.parseLong(asnToken) < lvAsn) {
+                             if ( (stateToken.contains("FORGOTTEN")) ||
+                                  (stateToken.equals("COMMITTED") && (lvAgeCommitted)) ||
+                                  (stateToken.equals("ABORTED") && (lvAgeCommitted))) {
+
+                                if (LOG.isTraceEnabled()) LOG.trace("Deleting transid: " + transidToken
+                                        + " from region: " + m_Region.getRegionInfo().getRegionNameAsString() + " with state: " + stateToken);
+
+                                try {
+                                   Delete d = new Delete(result.getRow());
+                                   m_Region.delete(d);
+                                }
+                                catch (Exception e) {
+                                   LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries -"
+                                       + " txId " + transidToken + ", Executing delete caught an exception " + e);
+                                   throw new IOException(e.toString());
+                                }
+                                count++;
+                             }
+                          } else {
+                             if (LOG.isTraceEnabled()) LOG.trace("deleteTlogEntries Ending scan at asn: " + asnToken
+                                     + ", transid: " + transidToken +
+                                     " because it is not less than the comparator: " + lvAsn +
+                                     " in region: " + m_Region.getRegionInfo().getRegionNameAsString());
+                             shouldContinue = false;
+                             break;
+                          }
+                       } // if (st.hasMoreElements()
+                    } // for (Cell cell : result.rawCells()
+                 } // if (!result.isEmpty()
+                 cellResults.clear();
+
+                 if (!hasMore){
+                    shouldContinue = false;
+                 }
+              } // while (shouldContinue)
+              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+                      + transId + ", count is " + count + ", hasMore is " + hasMore
+                      + ", result " + result.isEmpty());
+           }
+           else {
+              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+                      + transId + ", scanner is null");
+           }
+        } catch(OutOfOrderScannerNextException ooone) {
+           if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+                    + transId + " Caught OutOfOrderScannerNextException "
+                    + ooone.getMessage() + " " + stackTraceToString(ooone));
+           ooo = ooone;
+        } catch(ScannerTimeoutException cste) {
+           if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+                    + transId + " Caught ScannerTimeoutException "
+                    + cste.getMessage() + " " + stackTraceToString(cste));
+           ste = cste;
+        } catch(Throwable e) {
+           if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+                    + transId + " Caught throwable exception "
+                    + e.getMessage() + " " + stackTraceToString(e));
+           t = e;
+        }
+        if (scanner != null) {
+           try {
+              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - txId "
+                  + transId + ", closing the scanner, region is " + regionInfo.getRegionNameAsString());
+              scanner.close();
+           } catch(Exception e) {
+              if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries -  transaction id "
+                   + transId + ", Caught general exception " + e.getMessage() + " " + stackTraceToString(e));
+              ne = e;
+           }
+        }
+     }
+
+     org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteResponse.Builder deleteResponseBuilder = TlogDeleteResponse.newBuilder();
+     deleteResponseBuilder.setCount(count);
+     deleteResponseBuilder.setHasException(false);
+
+     if (t != null){
+        deleteResponseBuilder.setHasException(true);
+        deleteResponseBuilder.setException(t.toString());
+     }
+
+     if (ste != null){
+        deleteResponseBuilder.setHasException(true);
+        deleteResponseBuilder.setException(ste.toString());
+     }
+
+     if (wre != null){
+        deleteResponseBuilder.setHasException(true);
+        deleteResponseBuilder.setException(wre.toString());
+     }
+
+     if (ne != null){
+        deleteResponseBuilder.setHasException(true);
+        deleteResponseBuilder.setException(ne.toString());
+     }
+
+     if (ooo != null){
+        deleteResponseBuilder.setHasException(true);
+        deleteResponseBuilder.setException(ooo.toString());
+     }
+
+     if (use != null){
+        deleteResponseBuilder.setHasException(true);
+        deleteResponseBuilder.setException(use.toString());
+     }
+
+     if (oop != null){
+        if (this.suppressOutOfOrderProtocolException == false){
+           deleteResponseBuilder.setHasException(true);
+           deleteResponseBuilder.setException(oop.toString());
+           LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries - OutOfOrderProtocolException, transaction was not found, txId: "
+                 + transId + ", return exception" + ", regionName " + regionInfo.getRegionNameAsString());
+        }
+        else{
+           LOG.warn("TrxRegionEndpoint coprocessor: deleteTlogEntries - suppressing OutOfOrderProtocolException, transaction was not found, txId: "
+                + transId + ", regionName " + regionInfo.getRegionNameAsString());
+        }
+     }
+
+     if (mue != null){
+        if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: deleteTlogEntries - performing memoryPercentage "
+            + memoryPercentage + ", posting memory usage exceeds indicated percentage");
+        deleteResponseBuilder.setHasException(true);
+        deleteResponseBuilder.setException(mue.toString());
+     }
+
+     TlogDeleteResponse TlogDel_response = deleteResponseBuilder.build();
+     done.run(TlogDel_response);
+  }
+
   @Override
   public void put(RpcController controller,
                   PutTransactionalRequest request,
@@ -4607,7 +4810,7 @@ CoprocessorService, Coprocessor {
          state.setSequenceNumber(nextSequenceId.getAndIncrement());
          commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(), state);
          if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: Transaction " + transactionId
-               + " found in region " + m_Region.getRegionInfo().getRegionNameAsString()
+               + " found in region " + lv_regionName
                + ". Adding to commitedTransactionsBySequenceNumber for sequence number " + state.getSequenceNumber());
       }
       commitCheckEndTime = putBySequenceEndTime = System.nanoTime();
@@ -4767,7 +4970,8 @@ CoprocessorService, Coprocessor {
     if(state.getSplitRetry())
       return COMMIT_RESEND;
     retireTransaction(state, true);
-    if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest READ ONLY -- EXIT txId: " + transactionId);
+    if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest READ ONLY for participant "
+       + participantNum + " -- EXIT txId: " + transactionId + ", regionName " + regionInfo.getRegionNameAsString());
     return COMMIT_OK_READ_ONLY;
   }