You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2016/07/18 22:40:57 UTC

[5/8] incubator-trafodion git commit: Cherry-pick changes for JIRA 2095

Cherry-pick changes for JIRA 2095


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/b452921f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/b452921f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/b452921f

Branch: refs/heads/master
Commit: b452921fe23a7f6c1cebc8dd670fbcc98a7e88d9
Parents: c74e3d6 cc51150
Author: Sean Broeder <sb...@edev06.esgyn.local>
Authored: Thu Jul 7 03:50:39 2016 +0000
Committer: Sean Broeder <sb...@edev06.esgyn.local>
Committed: Sun Jul 17 03:47:31 2016 +0000

----------------------------------------------------------------------
 .../hbase/client/transactional/RMInterface.java | 162 +------------------
 .../hbase/client/transactional/TmDDL.java       |   4 +-
 .../transactional/TransactionManager.java       | 142 +++++++++++++++-
 .../java/org/trafodion/dtm/HBaseTxClient.java   |   3 +
 core/sql/regress/executor/EXPECTED022.SB        |  16 +-
 docs/shared/license.txt                         |   5 +
 docs/shared/revisions.txt                       |   4 +-
 docs/src/site/markdown/download.md              |   8 +-
 docs/src/site/markdown/index.md                 |   8 +-
 docs/src/site/markdown/release-notes-2-0-1.md   |  10 +-
 docs/src/site/resources/css/site.css            |   1 +
 docs/src/site/site.xml                          |  12 +-
 12 files changed, 181 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
----------------------------------------------------------------------
diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
index fe98284,df74a45..dc3e140
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
@@@ -73,18 -68,6 +68,13 @@@ import org.apache.hadoop.hbase.regionse
  import org.apache.hadoop.hbase.regionserver.transactional.IdTmException;
  import org.apache.hadoop.hbase.regionserver.transactional.IdTmId;
  
- import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
- import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
- import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
- 
 +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 +import org.apache.hadoop.hbase.ipc.ServerRpcController;
 +
 +import org.apache.zookeeper.KeeperException;
 +
 +import com.google.protobuf.ByteString;
 +
- import java.util.ArrayList;
  import java.util.Map;
  import java.util.HashMap;
  import java.util.Iterator;
@@@ -372,20 -222,12 +229,12 @@@ public class RMInterface 
      }
  
      public void createTable(HTableDescriptor desc, byte[][] keys, int numSplits, int keyLength, long transID) throws IOException {
 -
 -        if (LOG.isTraceEnabled()) LOG.trace("createTable ENTER: ");
 -            byte[] lv_byte_desc = desc.toByteArray();
 -            byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
 -            if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc));
 -            createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname);
 +    	if (LOG.isTraceEnabled()) LOG.trace("Enter createTable, txid: " + transID + " Table: " + desc.getNameAsString());
 +        byte[] lv_byte_desc = desc.toByteArray();
 +        byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
 +        if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc));
 +        createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname);
-         TransactionState ts = mapTransactionStates.get(transID);
-         if (LOG.isTraceEnabled()) LOG.trace("createTable: pushing epoch into regions for : " + desc.getNameAsString());
-         if (ts == null){
-            if (LOG.isTraceEnabled()) LOG.trace("pushing epoch into regions but unable to get ts object for transID : " + transID);
-            throw new IOException("createTable push epoch exception for table " + desc.getNameAsString());
-         }
-         pushRegionEpoch(desc, ts);
-         if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into regions for : " + desc.getNameAsString());
 +        if (LOG.isTraceEnabled()) LOG.trace("Exit createTable, txid: " + transID + " Table: " + desc.getNameAsString());
      }
  
      public void truncateTableOnAbort(String tblName, long transID) throws IOException {
@@@ -419,21 -261,15 +268,18 @@@
      static public synchronized void unregisterTransaction(final long transactionID) {
        TransactionState ts = null;
        if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction txid: " + transactionID);
-       try {
--        ts = mapTransactionStates.remove(transactionID);
-       } catch (Exception e) {
-         LOG.warn("Ignoring exception. mapTransactionStates.remove for transid " + transactionID + 
-                  " failed with exception " + e);
-         return;
-       }
++      ts = mapTransactionStates.remove(transactionID);
        if (ts == null) {
          LOG.warn("mapTransactionStates.remove did not find transid " + transactionID);
        }
++      if (LOG.isTraceEnabled()) LOG.trace("Exit unregisterTransaction txid: " + transactionID);
      }
  
      // Not used?
      static public synchronized void unregisterTransaction(TransactionState ts) {
++        if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction ts: " + ts.getTransactionId());
          mapTransactionStates.remove(ts.getTransactionId());
++        if (LOG.isTraceEnabled()) LOG.trace("Exit unregisterTransaction ts: " + ts.getTransactionId());
      }
  
      public synchronized Result get(final long transactionID, final Get get) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
----------------------------------------------------------------------
diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
index 783b70c,783b70c..9985861
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
@@@ -106,8 -106,8 +106,8 @@@ public class TmDDL 
     public void putRow(final long transid, final String Operation, final String tableName) throws IOException {
  
          long threadId = Thread.currentThread().getId();
--        if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow Operation, TxID: " + transid + "Thread ID:" + threadId 
--                + "TableName:" + tableName + "Operation :" + Operation);
++        if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow Operation, TxID: " + transid + " Thread ID:" + threadId 
++                + " TableName:" + tableName + " Operation :" + Operation);
          byte [] value = null;
          StringBuilder tableString = null;
          Result r = null;

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
----------------------------------------------------------------------
diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
index 9c950d7,2c7e6af..cc4b2fc
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@@ -70,22 -70,22 +70,25 @@@ import org.apache.hadoop.hbase.client.H
  import org.apache.hadoop.hbase.client.HTable;
  import org.apache.hadoop.hbase.client.coprocessor.Batch;
  import org.apache.hadoop.hbase.client.Durability;
++import org.apache.hadoop.hbase.client.RegionLocator;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleRequest;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionRequest;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionResponse;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleRequest;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleRequest;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleResponse;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequest;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestResponse;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitResponse;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse;
  import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleRequest;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleRequest;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleRequest;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleResponse;
  
  import org.apache.hadoop.hbase.exceptions.DeserializationException;
  import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@@ -1453,6 -1452,6 +1456,72 @@@ public class TransactionManager 
        if(LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch -- EXIT txID: " + transactionId);
        return 0;
      }
++  
++    public Integer pushRegionEpochX(final TransactionState txState,
++          final HRegionLocation location, HConnection connection) throws IOException {
++       if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry txState: " + txState
++                     + " location: " + location);
++
++       Batch.Call<TrxRegionService, PushEpochResponse> callable =
++           new Batch.Call<TrxRegionService, PushEpochResponse>() {
++              ServerRpcController controller = new ServerRpcController();
++              BlockingRpcCallback<PushEpochResponse> rpcCallback =
++                 new BlockingRpcCallback<PushEpochResponse>();
++
++           public PushEpochResponse call(TrxRegionService instance) throws IOException {
++              org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder
++              builder = PushEpochRequest.newBuilder();
++              builder.setTransactionId(txState.getTransactionId());
++              builder.setEpoch(txState.getStartEpoch());
++              builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName())));
++              instance.pushOnlineEpoch(controller, builder.build(), rpcCallback);
++              return rpcCallback.get();
++           }
++       };
++
++       Map<byte[], PushEpochResponse> result = null;
++       if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- before coprocessorService: startKey: "
++              + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
++
++       boolean loopExit = false;
++       do
++       {
++         try {
++           result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
++           loopExit = true; 
++         } 
++         catch (ServiceException se) {
++            if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- ServiceException ", se);
++            throw new IOException(se);
++         }
++         catch (Throwable t) {
++            if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Throwable ", t);
++            throw new IOException(t);
++         }
++
++       } while (loopExit == false);
++
++
++       if(result.size() == 1){
++          // size is 1
++          for (PushEpochResponse eresponse : result.values()){
++             if(eresponse.getHasException()) {
++                String exceptionString = new String (eresponse.getException().toString());
++                LOG.error("pushRegionEpochX - coprocessor exceptionString: " + exceptionString);
++                throw new IOException(eresponse.getException());
++             }
++          }
++       }
++       else {
++          LOG.error("pushRegionEpochX, received incorrect result size: " + result.size() + " txid: "
++                + txState.getTransactionId() + " location: " + location.getRegionInfo().getRegionNameAsString());
++          return 1;
++       }
++       if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Exit txState: " + txState
++                + " location: " + location);
++       return 0;       
++    }
++
    } // TransactionManagerCallable
  
    private void checkException(TransactionState ts, List<TransactionRegionLocation> locations, List<String> exceptions) throws IOException {
@@@ -1602,7 -1600,7 +1671,7 @@@
           if (LOG.isTraceEnabled()) LOG.trace("beginTransaction NOT retrieving new startId");
        }
        if (LOG.isTraceEnabled()) LOG.trace("beginTransaction setting transaction: [" + ts.getTransactionId() +
--                      "] with startId: " + startIdVal);
++                      "], startEpoch: " + ts.getStartEpoch() + " and startId: " + startIdVal);
        ts.setStartId(startIdVal);
        return ts;
      }
@@@ -1971,6 -1969,6 +2040,56 @@@
        if(LOG.isTraceEnabled()) LOG.trace("retryCommit -- EXIT -- txid: " + transactionState.getTransactionId());
      }
  
++    public void pushRegionEpoch (HTableDescriptor desc, final TransactionState ts) throws IOException {
++       LOG.info("pushRegionEpoch start; transId: " + ts.getTransactionId());
++
++       TransactionalTable ttable1 = new TransactionalTable(Bytes.toBytes(desc.getNameAsString()));
++       HConnection connection = ttable1.getConnection();
++       long lvTransid = ts.getTransactionId();
++       RegionLocator rl = connection.getRegionLocator(desc.getTableName());
++       List<HRegionLocation> regionList = rl.getAllRegionLocations();
++       // (need one CompletionService per request for thread safety, can share pool of threads
++       CompletionService<Integer> compPool = new ExecutorCompletionService<Integer>(threadPool);
++
++       boolean complete = false;
++       int loopCount = 0;
++       int result = 0;
++       for (HRegionLocation location : regionList) {
++          final byte[] regionName = location.getRegionInfo().getRegionName();
++          final HConnection lv_connection = connection;
++          final TransactionRegionLocation lv_location = 
++                                 new TransactionRegionLocation(location.getRegionInfo(), location.getServerName());
++          compPool.submit(new TransactionManagerCallable(ts, lv_location, lv_connection) {
++             public Integer call() throws IOException {
++                return pushRegionEpochX(ts, lv_location, lv_connection);
++             }
++          });
++          boolean loopExit = false;
++          do
++          {
++            try {
++              result = compPool.take().get();
++              loopExit = true; 
++            } 
++            catch (InterruptedException ie) {}
++            catch (ExecutionException e) {
++               if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch -- ExecutionException ", e);
++               throw new IOException(e);
++            }
++
++          } while (loopExit == false);
++
++          if ( result != 0 ){
++             LOG.error("pushRegionEpoch result " + result + " returned from region "
++                          + location.getRegionInfo().getRegionName());
++             throw new IOException("pushRegionEpoch result " + result + " returned from region "
++                      + location.getRegionInfo().getRegionName());
++          }
++       }
++       if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: " + ts.getTransactionId());
++       return;
++    }
++
      public void retryAbort(final TransactionState transactionState) throws IOException {
        if(LOG.isTraceEnabled()) LOG.trace("retryAbort -- ENTRY -- txid: " + transactionState.getTransactionId());
        synchronized(transactionState.getRetryRegions()) {
@@@ -2639,6 -2637,6 +2758,12 @@@
  
              //record this create in TmDDL.
              tmDDL.putRow( transactionState.getTransactionId(), "CREATE", desc.getNameAsString());
++
++            if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into regions for : " + desc.getNameAsString());
++            pushRegionEpoch(desc, transactionState);
++
++          if (LOG.isTraceEnabled()) LOG.trace("createTable EXIT, transactionState: " + transactionState.getTransactionId());
++
      }
  
      private class ChangeFlags {
@@@ -2883,7 -2881,7 +3008,7 @@@
   
      public void alterTable(final TransactionState transactionState, String tblName, Object[]  tableOptions)
             throws IOException {
--        if (LOG.isTraceEnabled()) LOG.trace("createTable ENTRY, transactionState: " + transactionState.getTransactionId());
++        if (LOG.isTraceEnabled()) LOG.trace("alterTable ENTRY, transactionState: " + transactionState.getTransactionId());
          
             HTableDescriptor htblDesc = hbadmin.getTableDescriptor(tblName.getBytes());
             HColumnDescriptor[] families = htblDesc.getColumnFamilies();
@@@ -2908,6 -2906,6 +3033,7 @@@
  
             //record this create in TmDDL.
             tmDDL.putRow( transactionState.getTransactionId(), "ALTER", tblName);
++           if (LOG.isTraceEnabled()) LOG.trace("alterTable EXIT, transactionState: " + transactionState.getTransactionId());
      }
  
      public void registerTruncateOnAbort(final TransactionState transactionState, String tblName)

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --cc core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index 4b1c4d8,4b1c4d8..ae058de
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@@ -706,6 -706,6 +706,9 @@@ public class HBaseTxClient 
  
           throw new Exception("createTable call error");
        }
++
++      
++      if (LOG.isTraceEnabled()) LOG.trace("Exit callCreateTable, txid: [" + transactionId + "] returning RET_OK");
        return TransReturnCode.RET_OK.getShort();
     }