You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by db...@apache.org on 2016/07/18 22:40:56 UTC
[4/8] incubator-trafodion git commit: Fix to correct loss of updates
following a regionServer failure v2
Fix to correct loss of updates following a regionServer failure v2
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/c74e3d62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/c74e3d62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/c74e3d62
Branch: refs/heads/master
Commit: c74e3d62c509c28d890a8329f4346b4b80698064
Parents: 9fc659a
Author: Sean Broeder <sb...@edev06.esgyn.local>
Authored: Fri Jul 1 21:43:38 2016 +0000
Committer: Sean Broeder <sb...@edev06.esgyn.local>
Committed: Wed Jul 6 19:17:17 2016 +0000
----------------------------------------------------------------------
.../hbase/client/transactional/RMInterface.java | 176 +-
.../transactional/TransactionManager.java | 12 +-
.../client/transactional/TransactionState.java | 16 +-
.../transactional/TrxRegionEndpoint.java.tmpl | 65 +-
.../generated/SsccRegionProtos.java | 21 +
.../generated/TrxRegionProtos.java | 2418 +++++++++++++-----
.../hbase-trx/src/main/protobuf/TrxRegion.proto | 21 +-
7 files changed, 2103 insertions(+), 626 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
index df74a45..fe98284 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
@@ -38,14 +38,19 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.client.transactional.TransactionManager;
import org.apache.hadoop.hbase.client.transactional.TransactionState;
@@ -68,6 +73,18 @@ import org.apache.hadoop.hbase.regionserver.transactional.IdTm;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmException;
import org.apache.hadoop.hbase.regionserver.transactional.IdTmId;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TrxRegionService;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
+
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.ByteString;
+
+import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
@@ -75,6 +92,12 @@ import java.util.List;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
public class RMInterface {
static final Log LOG = LogFactory.getLog(RMInterface.class);
@@ -84,6 +107,9 @@ public class RMInterface {
public AlgorithmType TRANSACTION_ALGORITHM;
static Map<Long, Set<RMInterface>> mapRMsPerTransaction = new HashMap<Long, Set<RMInterface>>();
private TransactionalTableClient ttable = null;
+ private ExecutorService threadPool;
+ private CompletionService<Integer> compPool;
+ private int intThreads = 16;
static {
System.loadLibrary("stmlib");
}
@@ -137,6 +163,130 @@ public class RMInterface {
}
+ public void pushRegionEpoch (HTableDescriptor desc, final TransactionState ts) throws IOException {
+ LOG.info("pushRegionEpoch start; transId: " + ts.getTransactionId());
+
+ TransactionalTable ttable1 = new TransactionalTable(Bytes.toBytes(desc.getNameAsString()));
+ HConnection connection = ttable1.getConnection();
+ long lvTransid = ts.getTransactionId();
+ RegionLocator rl = connection.getRegionLocator(desc.getTableName());
+ List<HRegionLocation> regionList = rl.getAllRegionLocations();
+
+ boolean complete = false;
+ int loopCount = 0;
+ int result = 0;
+
+ for (HRegionLocation location : regionList) {
+ final byte[] regionName = location.getRegionInfo().getRegionName();
+ if (compPool == null){
+ LOG.info("pushRegionEpoch compPool is null");
+ threadPool = Executors.newFixedThreadPool(intThreads);
+ compPool = new ExecutorCompletionService<Integer>(threadPool);
+ }
+
+ final HRegionLocation lv_location = location;
+ final HConnection lv_connection = connection;
+ compPool.submit(new RMCallable2(ts, lv_location, lv_connection ) {
+ public Integer call() throws IOException {
+ return pushRegionEpochX(ts, lv_location, lv_connection);
+ }
+ });
+ try {
+ result = compPool.take().get();
+ } catch(Exception ex) {
+ throw new IOException(ex);
+ }
+ if ( result != 0 ){
+ LOG.error("pushRegionEpoch result " + result + " returned from region "
+ + location.getRegionInfo().getRegionName());
+ throw new IOException("pushRegionEpoch result " + result + " returned from region "
+ + location.getRegionInfo().getRegionName());
+ }
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: " + ts.getTransactionId());
+ return;
+ }
+
+ private abstract class RMCallable2 implements Callable<Integer>{
+ TransactionState transactionState;
+ HRegionLocation location;
+ HConnection connection;
+ HTable table;
+ byte[] startKey;
+ byte[] endKey_orig;
+ byte[] endKey;
+
+ RMCallable2(TransactionState txState, HRegionLocation location, HConnection connection) {
+ this.transactionState = txState;
+ this.location = location;
+ this.connection = connection;
+ try {
+ table = new HTable(location.getRegionInfo().getTable(), connection);
+ } catch(IOException e) {
+ LOG.error("Error obtaining HTable instance " + e);
+ table = null;
+ }
+ startKey = location.getRegionInfo().getStartKey();
+ endKey_orig = location.getRegionInfo().getEndKey();
+ endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
+
+ }
+
+ public Integer pushRegionEpochX(final TransactionState txState,
+ final HRegionLocation location, HConnection connection) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry txState: " + txState
+ + " location: " + location);
+
+ Batch.Call<TrxRegionService, PushEpochResponse> callable =
+ new Batch.Call<TrxRegionService, PushEpochResponse>() {
+ ServerRpcController controller = new ServerRpcController();
+ BlockingRpcCallback<PushEpochResponse> rpcCallback =
+ new BlockingRpcCallback<PushEpochResponse>();
+
+ @Override
+ public PushEpochResponse call(TrxRegionService instance) throws IOException {
+ org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder
+ builder = PushEpochRequest.newBuilder();
+ builder.setTransactionId(txState.getTransactionId());
+ builder.setEpoch(txState.getStartEpoch());
+ builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName())));
+ instance.pushOnlineEpoch(controller, builder.build(), rpcCallback);
+ return rpcCallback.get();
+ }
+ };
+
+ Map<byte[], PushEpochResponse> result = null;
+ try {
+ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- before coprocessorService: startKey: "
+ + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
+ result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
+ } catch (Throwable e) {
+ String msg = "ERROR occurred while calling pushRegionEpoch coprocessor service in pushRegionEpochX";
+ LOG.error(msg + ":" + e);
+ throw new IOException(msg);
+ }
+
+ if(result.size() == 1){
+ // size is 1
+ for (PushEpochResponse eresponse : result.values()){
+ if(eresponse.getHasException()) {
+ String exceptionString = new String (eresponse.getException().toString());
+ LOG.error("pushRegionEpochX - coprocessor exceptionString: " + exceptionString);
+ throw new IOException(eresponse.getException());
+ }
+ }
+ }
+ else {
+ LOG.error("pushRegionEpochX, received incorrect result size: " + result.size() + " txid: "
+ + txState.getTransactionId() + " location: " + location.getRegionInfo().getRegionNameAsString());
+ return 1;
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Exit txState: " + txState
+ + " location: " + location);
+ return 0;
+ }
+ }
+
public synchronized TransactionState registerTransaction(final long transactionID, final byte[] row) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter registerTransaction, transaction ID: " + transactionID);
boolean register = false;
@@ -222,12 +372,20 @@ public class RMInterface {
}
public void createTable(HTableDescriptor desc, byte[][] keys, int numSplits, int keyLength, long transID) throws IOException {
-
- if (LOG.isTraceEnabled()) LOG.trace("createTable ENTER: ");
- byte[] lv_byte_desc = desc.toByteArray();
- byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
- if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc));
- createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname);
+ if (LOG.isTraceEnabled()) LOG.trace("Enter createTable, txid: " + transID + " Table: " + desc.getNameAsString());
+ byte[] lv_byte_desc = desc.toByteArray();
+ byte[] lv_byte_tblname = desc.getNameAsString().getBytes();
+ if (LOG.isTraceEnabled()) LOG.trace("createTable: htabledesc bytearray: " + lv_byte_desc + "desc in hex: " + Hex.encodeHexString(lv_byte_desc));
+ createTableReq(lv_byte_desc, keys, numSplits, keyLength, transID, lv_byte_tblname);
+ TransactionState ts = mapTransactionStates.get(transID);
+ if (LOG.isTraceEnabled()) LOG.trace("createTable: pushing epoch into regions for : " + desc.getNameAsString());
+ if (ts == null){
+ if (LOG.isTraceEnabled()) LOG.trace("pushing epoch into regions but unable to get ts object for transID : " + transID);
+ throw new IOException("createTable push epoch exception for table " + desc.getNameAsString());
+ }
+ pushRegionEpoch(desc, ts);
+ if (LOG.isTraceEnabled()) LOG.trace("createTable: epoch pushed into regions for : " + desc.getNameAsString());
+ if (LOG.isTraceEnabled()) LOG.trace("Exit createTable, txid: " + transID + " Table: " + desc.getNameAsString());
}
public void truncateTableOnAbort(String tblName, long transID) throws IOException {
@@ -261,7 +419,13 @@ public class RMInterface {
static public synchronized void unregisterTransaction(final long transactionID) {
TransactionState ts = null;
if (LOG.isTraceEnabled()) LOG.trace("Enter unregisterTransaction txid: " + transactionID);
+ try {
ts = mapTransactionStates.remove(transactionID);
+ } catch (Exception e) {
+ LOG.warn("Ignoring exception. mapTransactionStates.remove for transid " + transactionID +
+ " failed with exception " + e);
+ return;
+ }
if (ts == null) {
LOG.warn("mapTransactionStates.remove did not find transid " + transactionID);
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
index 9d90ace..9c950d7 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@ -608,9 +608,9 @@ public class TransactionManager {
* Return : Commit vote (yes, no, read only)
* Purpose : Call prepare for a given regionserver
*/
- public Integer doPrepareX(final byte[] regionName, final long transactionId, final long startEpoc, final int participantNum, final TransactionRegionLocation location)
+ public Integer doPrepareX(final byte[] regionName, final long transactionId, final long startEpoch, final int participantNum, final TransactionRegionLocation location)
throws IOException, CommitUnsuccessfulException {
- if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- ENTRY txid: " + transactionId + " startEpoc " + startEpoc
+ if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- ENTRY txid: " + transactionId + " startEpoch " + startEpoch
+ " participantNum " + participantNum + " RegionName " + Bytes.toString(regionName)
+ " TableName " + table.toString() + " location " + location );
int commitStatus = 0;
@@ -632,7 +632,7 @@ public class TransactionManager {
public CommitRequestResponse call(TrxRegionService instance) throws IOException {
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CommitRequestRequest.Builder builder = CommitRequestRequest.newBuilder();
builder.setTransactionId(transactionId);
- builder.setStartEpoc(startEpoc);
+ builder.setStartEpoch(startEpoch);
builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(regionName)));
builder.setParticipantNum(participantNum);
@@ -1581,7 +1581,7 @@ public class TransactionManager {
//long transactionId =
if (LOG.isTraceEnabled()) LOG.trace("Enter beginTransaction, txid: " + transactionId);
TransactionState ts = new TransactionState(transactionId);
- ts.setStartEpoc(EnvironmentEdgeManager.currentTime());
+ ts.setStartEpoch(EnvironmentEdgeManager.currentTime());
long startIdVal = -1;
// Set the startid
@@ -1698,7 +1698,7 @@ public class TransactionManager {
public Integer call() throws CommitUnsuccessfulException, IOException {
return doPrepareX(location.getRegionInfo().getRegionName(),
- transactionState.getTransactionId(), transactionState.getStartEpoc(), lvParticipantNum,
+ transactionState.getTransactionId(), transactionState.getStartEpoch(), lvParticipantNum,
location);
}
});
@@ -1783,7 +1783,7 @@ public class TransactionManager {
compPool.submit(new TransactionManagerCallable(transactionState, location, connection) {
public Integer call() throws IOException, CommitUnsuccessfulException {
- return doPrepareX(regionName, transactionState.getTransactionId(), transactionState.getStartEpoc(), lvParticipantNum, myLocation);
+ return doPrepareX(regionName, transactionState.getTransactionId(), transactionState.getStartEpoch(), lvParticipantNum, myLocation);
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
index 7c837f4..2dfcc93 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
@@ -46,7 +46,7 @@ public class TransactionState {
private final long transactionId;
private TransState status;
- private long startEpoc;
+ private long startEpoch;
private long startId;
private long commitId;
@@ -367,17 +367,17 @@ public class TransactionState {
* Set the startEpoc.
*
*/
- public void setStartEpoc(final long epoc) {
- this.startEpoc = epoc;
+ public void setStartEpoch(final long epoch) {
+ this.startEpoch = epoch;
}
/**
- * Get the startEpoc.
+ * Get the startEpoch.
*
- * @return Return the startEpoc.
+ * @return Return the startEpoch.
*/
- public long getStartEpoc() {
- return startEpoc;
+ public long getStartEpoch() {
+ return startEpoch;
}
/**
@@ -420,7 +420,7 @@ public class TransactionState {
@Override
public String toString() {
return "transactionId: " + transactionId + ", startId: " + startId + ", commitId: " + commitId +
- ", startEpoc: " + startEpoc + ", participants: " + participatingRegions.size()
+ ", startEpoch: " + startEpoch + ", participants: " + participatingRegions.size()
+ ", ignoring: " + regionsToIgnore.size() + ", hasDDL: " + hasDDLTx()
+ ", state: " + status.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
index e351eb3..0bbca72 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java.tmpl
@@ -210,6 +210,8 @@ import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProt
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutTransactionalResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PutMultipleTransactionalResponse;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.RecoveryRequestResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TlogDeleteRequest;
@@ -304,7 +306,7 @@ CoprocessorService, Coprocessor {
private int regionState = 0;
private Path recoveryTrxPath = null;
private int cleanAT = 0;
- private long onlineEpoc = EnvironmentEdgeManager.currentTime();
+ private long onlineEpoch = EnvironmentEdgeManager.currentTime();
private long[] commitCheckTimes = new long[50];
private long[] hasConflictTimes = new long[50];
@@ -789,7 +791,7 @@ CoprocessorService, Coprocessor {
boolean reply = false;
long transactionId = request.getTransactionId();
long commitId = request.getCommitId();
- long startEpoc = request.getStartEpoc();
+ long startEpoch = request.getStartEpoch();
final int participantNum = request.getParticipantNum();
Throwable t = null;
WrongRegionException wre = null;
@@ -811,7 +813,7 @@ CoprocessorService, Coprocessor {
// Process local memory
try {
if (LOG.isDebugEnabled()) LOG.debug("commitIfPossible - txId " + transactionId + ", regionName, " + regionInfo.getRegionNameAsString() + "calling internal commitIfPossible");
- reply = commitIfPossible(transactionId, startEpoc, commitId, participantNum);
+ reply = commitIfPossible(transactionId, startEpoch, commitId, participantNum);
} catch (Throwable e) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", Caught exception after internal commitIfPossible call "
+ e.getMessage() + " " + stackTraceToString(e));
@@ -852,12 +854,12 @@ CoprocessorService, Coprocessor {
Throwable t = null;
WrongRegionException wre = null;
long transactionId = request.getTransactionId();
- long startEpoc = request.getStartEpoc();
+ long startEpoch = request.getStartEpoch();
int participantNum = request.getParticipantNum();
boolean dropTableRecorded = request.getDropTableRecorded();
if (LOG.isTraceEnabled()) LOG.trace("commitRequest - txId "
- + transactionId + ", startEpoc " + startEpoc + ", participantNum " + participantNum + ", dropTableRecorded " + dropTableRecorded +
+ + transactionId + ", startEpoch " + startEpoch + ", participantNum " + participantNum + ", dropTableRecorded " + dropTableRecorded +
", regionName " + regionInfo.getRegionNameAsString());
/* commenting out for the time being
@@ -876,7 +878,7 @@ CoprocessorService, Coprocessor {
{
// Process local memory
try {
- status = commitRequest(transactionId, startEpoc, participantNum, dropTableRecorded);
+ status = commitRequest(transactionId, startEpoch, participantNum, dropTableRecorded);
} catch (UnknownTransactionException u) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString());
ute = u;
@@ -935,7 +937,7 @@ CoprocessorService, Coprocessor {
Throwable t = null;
WrongRegionException wre = null;
long transactionId = request.getTransactionId();
- long startEpoc = request.getStartEpoc();
+ long startEpoch = request.getStartEpoch();
int i = 0;
int numOfRegion = request.getRegionNameCount();
String requestRegionName;
@@ -978,8 +980,8 @@ CoprocessorService, Coprocessor {
commitRequestMultipleResponseBuilder.setException(i, BatchException.EXCEPTION_REGIONNOTFOUND_ERR.toString());
}
else {
- if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, startEpoc, i, true);} // only the last region flush
- else {status = regionEPCP.commitRequest(transactionId, startEpoc, i, false);}
+ if (i == (numOfRegion - 1)) {status = regionEPCP.commitRequest(transactionId, startEpoch, i, true);} // only the last region flush
+ else {status = regionEPCP.commitRequest(transactionId, startEpoch, i, false);}
}
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint commitRequestMultiple ends");
//status = commitRequest(transactionId);
@@ -5099,16 +5101,16 @@ CoprocessorService, Coprocessor {
* @return TransactionRegionInterface commit code
* @throws IOException
*/
- public int commitRequest(final long transactionId, final long startEpoc, final int participantNum) throws IOException, UnknownTransactionException {
- return commitRequest(transactionId, startEpoc, participantNum, true, false);
+ public int commitRequest(final long transactionId, final long startEpoch, final int participantNum) throws IOException, UnknownTransactionException {
+ return commitRequest(transactionId, startEpoch, participantNum, true, false);
}
- public int commitRequest(final long transactionId, final long startEpoc, final int participantNum, final boolean dropTableRecorded)
+ public int commitRequest(final long transactionId, final long startEpoch, final int participantNum, final boolean dropTableRecorded)
throws IOException, UnknownTransactionException {
- return commitRequest(transactionId, startEpoc, participantNum, true, dropTableRecorded);
+ return commitRequest(transactionId, startEpoch, participantNum, true, dropTableRecorded);
}
- public int commitRequest(final long transactionId, final long startEpoc, final int participantNum, boolean flushHLOG,
+ public int commitRequest(final long transactionId, final long startEpoch, final int participantNum, boolean flushHLOG,
boolean dropTableRecorded) throws IOException,
UnknownTransactionException {
long txid = 0;
@@ -5119,9 +5121,9 @@ CoprocessorService, Coprocessor {
checkBlockNonPhase2(transactionId);
TrxTransactionState state;
- if (startEpoc < onlineEpoc) {
+ if (startEpoch < onlineEpoch) {
LOG.info("commitRequest txId: "
- + transactionId + " startEpoc " + startEpoc + " is less than region's onlineEpoc " + onlineEpoc
+ + transactionId + " startEpoch " + startEpoch + " is less than region's onlineEpoch " + onlineEpoch
+ " for regionName " + lv_regionName
+ " must return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR ");
return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
@@ -5527,7 +5529,7 @@ CoprocessorService, Coprocessor {
* @return boolean
* @throws IOException
*/
- public boolean commitIfPossible(final long transactionId, final long startEpoc, final long commitId, final int participantNum)
+ public boolean commitIfPossible(final long transactionId, final long startEpoch, final long commitId, final int participantNum)
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: "
@@ -5535,7 +5537,7 @@ CoprocessorService, Coprocessor {
checkBlockNonPhase2(transactionId);
- int status = commitRequest(transactionId, startEpoc, participantNum);
+ int status = commitRequest(transactionId, startEpoch, participantNum);
if (status == COMMIT_OK) {
@@ -6050,6 +6052,27 @@ CoprocessorService, Coprocessor {
}
}
}
+
+ public void pushOnlineEpoch(RpcController controller,
+ PushEpochRequest request, RpcCallback<PushEpochResponse> done) {
+
+ org.apache.hadoop.hbase.client.Result result = null;
+ long transId = request.getTransactionId();
+ long tmpEpoch = request.getEpoch();
+
+ if (LOG.isTraceEnabled()) LOG.trace("pushOnlineEpoch ENTRY. Epoch " + tmpEpoch
+ + " in region: " + regionInfo.getRegionNameAsString());
+
+ this.onlineEpoch = tmpEpoch;
+
+ org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochResponse.Builder pushEpochResponseBuilder = PushEpochResponse.newBuilder();
+ pushEpochResponseBuilder.setHasException(false);
+
+ PushEpochResponse epochResponse = pushEpochResponseBuilder.build();
+ done.run(epochResponse);
+ }
+
+
public void flushToFS(Path flushPath) throws IOException {
if(LOG.isTraceEnabled()) LOG.trace("flushToFS -- ENTRY, Path: " + flushPath.toString());
@@ -6089,7 +6112,7 @@ CoprocessorService, Coprocessor {
}
}
txnPersistBuilder.setNextSeqId(nextSequenceId.get());
- txnPersistBuilder.setOnlineEpoc(this.onlineEpoc);
+ txnPersistBuilder.setOnlineEpoch(this.onlineEpoch);
ByteArrayOutputStream output = new ByteArrayOutputStream();
@@ -6251,8 +6274,8 @@ CoprocessorService, Coprocessor {
}
this.nextSequenceId = new AtomicLong(txnPersistMsg.getNextSeqId());
- this.onlineEpoc = txnPersistMsg.getOnlineEpoc();
- LOG.info("Setting onlineEpoc after split to " + this.onlineEpoc);
+ this.onlineEpoch = txnPersistMsg.getOnlineEpoch();
+ LOG.info("Setting onlineEpoch after split to " + this.onlineEpoch);
} catch(IOException e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c74e3d62/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
index 79a687b..71a8a49 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/generated/SsccRegionProtos.java
@@ -1,3 +1,24 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: SsccRegion.proto