You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2016/07/18 22:40:57 UTC
[5/8] incubator-trafodion git commit: Cherry-pick changes for JIRA
2095
Cherry-pick changes for JIRA 2095
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/b452921f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/b452921f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/b452921f
Branch: refs/heads/master
Commit: b452921fe23a7f6c1cebc8dd670fbcc98a7e88d9
Parents: c74e3d6 cc51150
Author: Sean Broeder <sb...@edev06.esgyn.local>
Authored: Thu Jul 7 03:50:39 2016 +0000
Committer: Sean Broeder <sb...@edev06.esgyn.local>
Committed: Sun Jul 17 03:47:31 2016 +0000
----------------------------------------------------------------------
.../hbase/client/transactional/RMInterface.java | 162 +------------------
.../hbase/client/transactional/TmDDL.java | 4 +-
.../transactional/TransactionManager.java | 142 +++++++++++++++-
.../java/org/trafodion/dtm/HBaseTxClient.java | 3 +
core/sql/regress/executor/EXPECTED022.SB | 16 +-
docs/shared/license.txt | 5 +
docs/shared/revisions.txt | 4 +-
docs/src/site/markdown/download.md | 8 +-
docs/src/site/markdown/index.md | 8 +-
docs/src/site/markdown/release-notes-2-0-1.md | 10 +-
docs/src/site/resources/css/site.css | 1 +
docs/src/site/site.xml | 12 +-
12 files changed, 181 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
----------------------------------------------------------------------
diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
index fe98284,df74a45..dc3e140
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
@@@ -73,18 -68,6 +68,13 @@@ import org.apache.hadoop.hbase.regionse
import org.apache.hadoop.hbase.regionserver.transactional.IdTmException;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmId;
- import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
- import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
- import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
-
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.ByteString;
+
- import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
@@@ -372,20 -222,12 +229,12 @@@ public class RMInterface
}
public void createTable(HTableDescriptor desc, byte[][] keys, int numSplits, int keyLength, long transID) throws IOException {
-
- if (LOG.isTraceEnabled()) LOG.trace("createTable ENTER: ");
- byte[] lv_byte_desc = desc.toByteArray();
- byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
- if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc));
- createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname);
+ if (LOG.isTraceEnabled()) LOG.trace("Enter createTable, txid: " + transID + " Table: " + desc.getNameAsString());
+ byte[] lv_byte_desc = desc.toByteArray();
+ byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
+ if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc));
+ createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname);
- TransactionState ts = mapTransactionStates.get(transID);
- if (LOG.isTraceEnabled()) LOG.trace("createTable: pushing epoch into regions for : " + desc.getNameAsString());
- if (ts == null){
- if (LOG.isTraceEnabled()) LOG.trace("pushing epoch into regions but unable to get ts object for transID : " + transID);
- throw new IOException("createTable push epoch exception for table " + desc.getNameAsString());
- }
- pushRegionEpoch(desc, ts);
- if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into regions for : " + desc.getNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("Exit createTable, txid: " + transID + " Table: " + desc.getNameAsString());
}
public void truncateTableOnAbort(String tblName, long transID) throws IOException {
@@@ -419,21 -261,15 +268,18 @@@
static public synchronized void unregisterTransaction(final long transactionID) {
TransactionState ts = null;
if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction txid: " + transactionID);
- try {
-- ts = mapTransactionStates.remove(transactionID);
- } catch (Exception e) {
- LOG.warn("Ignoring exception. mapTransactionStates.remove for transid " + transactionID +
- " failed with exception " + e);
- return;
- }
++ ts = mapTransactionStates.remove(transactionID);
if (ts == null) {
LOG.warn("mapTransactionStates.remove did not find transid " + transactionID);
}
++ if (LOG.isTraceEnabled()) LOG.trace("Exit unregisterTransaction txid: " + transactionID);
}
// Not used?
static public synchronized void unregisterTransaction(TransactionState ts) {
++ if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction ts: " + ts.getTransactionId());
mapTransactionStates.remove(ts.getTransactionId());
++ if (LOG.isTraceEnabled()) LOG.trace("Exit unregisterTransaction ts: " + ts.getTransactionId());
}
public synchronized Result get(final long transactionID, final Get get) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
----------------------------------------------------------------------
diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
index 783b70c,783b70c..9985861
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TmDDL.java
@@@ -106,8 -106,8 +106,8 @@@ public class TmDDL
public void putRow(final long transid, final String Operation, final String tableName) throws IOException {
long threadId = Thread.currentThread().getId();
-- if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow Operation, TxID: " + transid + "Thread ID:" + threadId
-- + "TableName:" + tableName + "Operation :" + Operation);
++ if (LOG.isTraceEnabled()) LOG.trace("TmDDL putRow Operation, TxID: " + transid + " Thread ID:" + threadId
++ + " TableName:" + tableName + " Operation :" + Operation);
byte [] value = null;
StringBuilder tableString = null;
Result r = null;
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
----------------------------------------------------------------------
diff --cc core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
index 9c950d7,2c7e6af..cc4b2fc
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@@ -70,22 -70,22 +70,25 @@@ import org.apache.hadoop.hbase.client.H
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.Durability;
++import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleRequest;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionResponse;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleRequest;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleRequest;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitResponse;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
++import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleRequest;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitMultipleResponse;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleRequest;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.AbortTransactionMultipleResponse;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleRequest;
--import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestMultipleResponse;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@@ -1453,6 -1452,6 +1456,72 @@@ public class TransactionManager
if(LOG.isTraceEnabled()) LOG.trace("doAbortX - Batch -- EXIT txID: " + transactionId);
return 0;
}
++
++ public Integer pushRegionEpochX(final TransactionState txState,
++ final HRegionLocation location, HConnection connection) throws IOException {
++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry txState: " + txState
++ + " location: " + location);
++
++ Batch.Call<TrxRegionService, PushEpochResponse> callable =
++ new Batch.Call<TrxRegionService, PushEpochResponse>() {
++ ServerRpcController controller = new ServerRpcController();
++ BlockingRpcCallback<PushEpochResponse> rpcCallback =
++ new BlockingRpcCallback<PushEpochResponse>();
++
++ public PushEpochResponse call(TrxRegionService instance) throws IOException {
++ org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder
++ builder = PushEpochRequest.newBuilder();
++ builder.setTransactionId(txState.getTransactionId());
++ builder.setEpoch(txState.getStartEpoch());
++ builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName())));
++ instance.pushOnlineEpoch(controller, builder.build(), rpcCallback);
++ return rpcCallback.get();
++ }
++ };
++
++ Map<byte[], PushEpochResponse> result = null;
++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- before coprocessorService: startKey: "
++ + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
++
++ boolean loopExit = false;
++ do
++ {
++ try {
++ result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
++ loopExit = true;
++ }
++ catch (ServiceException se) {
++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- ServiceException ", se);
++ throw new IOException(se);
++ }
++ catch (Throwable t) {
++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Throwable ", t);
++ throw new IOException(t);
++ }
++
++ } while (loopExit == false);
++
++
++ if(result.size() == 1){
++ // size is 1
++ for (PushEpochResponse eresponse : result.values()){
++ if(eresponse.getHasException()) {
++ String exceptionString = new String (eresponse.getException().toString());
++ LOG.error("pushRegionEpochX - coprocessor exceptionString: " + exceptionString);
++ throw new IOException(eresponse.getException());
++ }
++ }
++ }
++ else {
++ LOG.error("pushRegionEpochX, received incorrect result size: " + result.size() + " txid: "
++ + txState.getTransactionId() + " location: " + location.getRegionInfo().getRegionNameAsString());
++ return 1;
++ }
++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Exit txState: " + txState
++ + " location: " + location);
++ return 0;
++ }
++
} // TransactionManagerCallable
private void checkException(TransactionState ts, List<TransactionRegionLocation> locations, List<String> exceptions) throws IOException {
@@@ -1602,7 -1600,7 +1671,7 @@@
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction NOT retrieving new startId");
}
if (LOG.isTraceEnabled()) LOG.trace("beginTransaction setting transaction: [" + ts.getTransactionId() +
-- "] with startId: " + startIdVal);
++ "], startEpoch: " + ts.getStartEpoch() + " and startId: " + startIdVal);
ts.setStartId(startIdVal);
return ts;
}
@@@ -1971,6 -1969,6 +2040,56 @@@
if(LOG.isTraceEnabled()) LOG.trace("retryCommit -- EXIT -- txid: " + transactionState.getTransactionId());
}
++ public void pushRegionEpoch (HTableDescriptor desc, final TransactionState ts) throws IOException {
++ LOG.info("pushRegionEpoch start; transId: " + ts.getTransactionId());
++
++ TransactionalTable ttable1 = new TransactionalTable(Bytes.toBytes(desc.getNameAsString()));
++ HConnection connection = ttable1.getConnection();
++ long lvTransid = ts.getTransactionId();
++ RegionLocator rl = connection.getRegionLocator(desc.getTableName());
++ List<HRegionLocation> regionList = rl.getAllRegionLocations();
++ // (need one CompletionService per request for thread safety, can share pool of threads
++ CompletionService<Integer> compPool = new ExecutorCompletionService<Integer>(threadPool);
++
++ boolean complete = false;
++ int loopCount = 0;
++ int result = 0;
++ for (HRegionLocation location : regionList) {
++ final byte[] regionName = location.getRegionInfo().getRegionName();
++ final HConnection lv_connection = connection;
++ final TransactionRegionLocation lv_location =
++ new TransactionRegionLocation(location.getRegionInfo(), location.getServerName());
++ compPool.submit(new TransactionManagerCallable(ts, lv_location, lv_connection) {
++ public Integer call() throws IOException {
++ return pushRegionEpochX(ts, lv_location, lv_connection);
++ }
++ });
++ boolean loopExit = false;
++ do
++ {
++ try {
++ result = compPool.take().get();
++ loopExit = true;
++ }
++ catch (InterruptedException ie) {}
++ catch (ExecutionException e) {
++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch -- ExecutionException ", e);
++ throw new IOException(e);
++ }
++
++ } while (loopExit == false);
++
++ if ( result != 0 ){
++ LOG.error("pushRegionEpoch result " + result + " returned from region "
++ + location.getRegionInfo().getRegionName());
++ throw new IOException("pushRegionEpoch result " + result + " returned from region "
++ + location.getRegionInfo().getRegionName());
++ }
++ }
++ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: " + ts.getTransactionId());
++ return;
++ }
++
public void retryAbort(final TransactionState transactionState) throws IOException {
if(LOG.isTraceEnabled()) LOG.trace("retryAbort -- ENTRY -- txid: " + transactionState.getTransactionId());
synchronized(transactionState.getRetryRegions()) {
@@@ -2639,6 -2637,6 +2758,12 @@@
//record this create in TmDDL.
tmDDL.putRow( transactionState.getTransactionId(), "CREATE", desc.getNameAsString());
++
++ if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into regions for : " + desc.getNameAsString());
++ pushRegionEpoch(desc, transactionState);
++
++ if (LOG.isTraceEnabled()) LOG.trace("createTable EXIT, transactionState: " + transactionState.getTransactionId());
++
}
private class ChangeFlags {
@@@ -2883,7 -2881,7 +3008,7 @@@
public void alterTable(final TransactionState transactionState, String tblName, Object[] tableOptions)
throws IOException {
-- if (LOG.isTraceEnabled()) LOG.trace("createTable ENTRY, transactionState: " + transactionState.getTransactionId());
++ if (LOG.isTraceEnabled()) LOG.trace("alterTable ENTRY, transactionState: " + transactionState.getTransactionId());
HTableDescriptor htblDesc = hbadmin.getTableDescriptor(tblName.getBytes());
HColumnDescriptor[] families = htblDesc.getColumnFamilies();
@@@ -2908,6 -2906,6 +3033,7 @@@
//record this create in TmDDL.
tmDDL.putRow( transactionState.getTransactionId(), "ALTER", tblName);
++ if (LOG.isTraceEnabled()) LOG.trace("alterTable EXIT, transactionState: " + transactionState.getTransactionId());
}
public void registerTruncateOnAbort(final TransactionState transactionState, String tblName)
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b452921f/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
----------------------------------------------------------------------
diff --cc core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
index 4b1c4d8,4b1c4d8..ae058de
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseTxClient.java
@@@ -706,6 -706,6 +706,9 @@@ public class HBaseTxClient
throw new Exception("createTable call error");
}
++
++
++ if (LOG.isTraceEnabled()) LOG.trace("Exit callCreateTable, txid: [" + transactionId + "] returning RET_OK");
return TransReturnCode.RET_OK.getShort();
}