You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2015/10/22 01:56:50 UTC
[4/6] incubator-trafodion git commit: [TRAFODION-34]Support region
splitting/balancing
[TRAFODION-34]Support region splitting/balancing
With transactions active. Functionality being checked in disabled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/65d0f1cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/65d0f1cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/65d0f1cf
Branch: refs/heads/master
Commit: 65d0f1cfe43cb51077227277e5b6321fd69aa442
Parents: 46f7be4
Author: Oliver Bucaojit <ol...@esgyn.com>
Authored: Tue Oct 20 04:24:54 2015 +0000
Committer: Oliver Bucaojit <ol...@esgyn.com>
Committed: Tue Oct 20 04:24:54 2015 +0000
----------------------------------------------------------------------
.../transactional/TransactionManager.java | 204 +-
.../client/transactional/TransactionState.java | 10 +
.../transactional/TransactionalReturn.java | 1 +
.../transactional/TransactionalTable.java | 285 +-
.../transactional/SplitBalanceHelper.java | 296 ++
.../transactional/TrxRegionEndpoint.java | 272 +-
.../transactional/TrxRegionObserver.java | 232 +-
.../generated/TrxRegionProtos.java | 3001 +++++++++++++++++-
.../transactional/TransactionState.java | 13 +
.../transactional/TrxTransactionState.java | 6 +
.../hbase-trx/src/main/protobuf/TrxRegion.proto | 21 +
11 files changed, 4067 insertions(+), 274 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
index 53b0bde..f8f5eaf 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@ -44,12 +44,14 @@
package org.apache.hadoop.hbase.client.transactional;
+import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
import java.util.List;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -62,6 +64,8 @@ import java.util.HashMap;
import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.fs.Path;
+
import org.apache.hadoop.hbase.ServerName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -282,7 +286,10 @@ public class TransactionManager {
}
startKey = location.getRegionInfo().getStartKey();
endKey_orig = location.getRegionInfo().getEndKey();
- endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
+ if(endKey_orig == null || endKey_orig == HConstants.EMPTY_END_ROW)
+ endKey = null;
+ else
+ endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
}
/**
@@ -334,12 +341,12 @@ public class TransactionManager {
LOG.error(msg + ":" + e);
throw new Exception(msg);
}
- if(result.size() != 1) {
+ if(result.size() == 0) {
LOG.error("doCommitX, received incorrect result size: " + result.size() + " txid: " + transactionId);
refresh = true;
retry = true;
}
- else {
+ else if(result.size() == 1){
// size is 1
for (CommitResponse cresponse : result.values()){
if(cresponse.getHasException()) {
@@ -361,6 +368,31 @@ public class TransactionManager {
}
retry = false;
}
+ else {
+ for (CommitResponse cresponse : result.values()){
+ if(cresponse.getHasException()) {
+ String exceptionString = new String (cresponse.getException().toString());
+ if (exceptionString.contains("UnknownTransactionException")) {
+ if (ignoreUnknownTransactionException == true) {
+ if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring UnknownTransactionException in cresponse");
+ }
+ else {
+ LOG.error("doCommitX, coprocessor UnknownTransactionException: " + cresponse.getException());
+ throw new UnknownTransactionException();
+ }
+ }
+ else if(exceptionString.contains("Asked to commit a non-pending transaction")) {
+ if (LOG.isTraceEnabled()) LOG.trace("doCommitX, ignoring 'commit non-pending transaction' in cresponse");
+ }
+ else {
+ if (LOG.isTraceEnabled()) LOG.trace("doCommitX coprocessor exception: " + cresponse.getException());
+ throw new Exception(cresponse.getException());
+ }
+ }
+ }
+ retry = false;
+ }
+
}
catch (UnknownTransactionException ute) {
LOG.error("Got unknown exception in doCommitX for transaction: " + transactionId + " " + ute);
@@ -368,16 +400,23 @@ public class TransactionManager {
throw new UnknownTransactionException();
}
catch (Exception e) {
- LOG.error("doCommitX retrying due to Exception: " + e);
- refresh = true;
- retry = true;
+ if(e.toString().contains("Asked to commit a non-pending transaction")) {
+ if (LOG.isDebugEnabled()) LOG.debug("doCommitX will not retry: " + e);
+ refresh = false;
+ retry = false;
+ }
+ else {
+ LOG.error("doCommitX retrying due to Exception: " + e);
+ refresh = true;
+ retry = true;
+ }
}
if (refresh) {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
+ //String lv_node = lv_hrl.getHostname();
+ //int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + " endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -494,8 +533,6 @@ public class TransactionManager {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -585,47 +622,89 @@ public class TransactionManager {
Map<byte[], CommitRequestResponse> result = null;
try {
-// if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- before coprocessorService txid: " + transactionId + " table: " + table.toString());
-// if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- txid: " + transactionId + " table: " + table.toString() + " endKey_Orig: " + new String(endKey_orig, "UTF-8"));
-// if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- " + table.toString() + " startKey: " + new String(startKey, "UTF-8") + " endKey: " + new String(endKey, "UTF-8"));
-
-// HRegionLocation lv_hrl = table.getRegionLocation(startKey);
-// HRegionInfo lv_hri = lv_hrl.getRegionInfo();
-// String lv_node = lv_hrl.getHostname();
-// int lv_length = lv_node.indexOf('.');
-
-// if ((location.getRegionInfo().getEncodedName().compareTo(lv_hri.getEncodedName()) != 0) || // Encoded name is different
-// (location.getHostname().regionMatches(0, lv_node, 0, lv_length) == false)) { // Node is different
-// if (LOG.isInfoEnabled())LOG.info("doPrepareX -- " + table.toString() + " location being refreshed");
-// if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- lv_hri: " + lv_hri);
-// if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location.getRegionInfo(): " + location.getRegionInfo());
-// if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- lv_node: " + lv_node + " lv_length: " + lv_length);
-// if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location.getHostname(): " + location.getHostname());
-// table.getRegionLocation(startKey, true);
-// }
result = table.coprocessorService(TrxRegionService.class, startKey, endKey, callable);
} catch (Throwable e) {
LOG.error("doPrepareX coprocessor error for " + Bytes.toString(regionName) + " txid: " + transactionId + ":" + e);
throw new CommitUnsuccessfulException("Unable to call prepare, coprocessor error");
}
- if(result.size() != 1) {
+ if(result.size() == 0) {
LOG.error("doPrepareX, received incorrect result size: " + result.size());
refresh = true;
retry = true;
}
- else {
+ else if(result.size() == 1){
// size is 1
for (CommitRequestResponse cresponse : result.values()){
// Should only be one result
int value = cresponse.getResult();
commitStatus = value;
if(cresponse.getHasException()) {
- if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " + cresponse.getException());
- throw new Exception(cresponse.getException());
+ if(transactionState.hasRetried() &&
+ cresponse.getException().contains("encountered unknown transactionID")) {
+ retry = false;
+ commitStatus = TransactionalReturn.COMMIT_OK_READ_ONLY;
+ }
+ else {
+ if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " + cresponse.getException());
+ throw new Exception(cresponse.getException());
+ }
+ }
+ if(value == TransactionalReturn.COMMIT_RESEND) {
+ // Handle situation where repeated region is in list due to different endKeys
+ int count = 0;
+ for(TransactionRegionLocation trl : this.transactionState.getParticipatingRegions()) {
+ if(trl.getRegionInfo().getTable().toString()
+ .compareTo(location.getRegionInfo().getTable().toString()) == 0
+ &&
+ Arrays.equals(trl.getRegionInfo().getStartKey(),
+ location.getRegionInfo().getStartKey())) {
+ count++;
+ }
+ }
+ if(count > 1) {
+ commitStatus = TransactionalReturn.COMMIT_OK;
+ retry = false;
+ }
+ else {
+ // Pause for split to complete and retry
+ Thread.sleep(100);
+ retry = true;
+ }
+ }
+ else {
+ retry = false;
}
}
- retry = false;
+ }
+ else {
+ for(CommitRequestResponse cresponse : result.values()) {
+ if(cresponse.getResult() == TransactionalReturn.COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR ||
+ cresponse.getResult() == TransactionalReturn.COMMIT_CONFLICT ||
+ cresponse.getResult() == TransactionalReturn.COMMIT_UNSUCCESSFUL ||
+ commitStatus == 0) {
+ commitStatus = cresponse.getResult();
+
+ if(cresponse.getHasException()) {
+ if(cresponse.getException().contains("encountered unknown transactionID")) {
+ retry = false;
+ commitStatus = TransactionalReturn.COMMIT_OK_READ_ONLY;
+ }
+ else {
+ if (LOG.isTraceEnabled()) LOG.trace("doPrepareX coprocessor exception: " +
+ cresponse.getException());
+ throw new Exception(cresponse.getException());
+ }
+ }
+ }
+ }
+
+ if(commitStatus == TransactionalReturn.COMMIT_OK ||
+ commitStatus == TransactionalReturn.COMMIT_OK_READ_ONLY ||
+ commitStatus == TransactionalReturn.COMMIT_RESEND) {
+ commitStatus = TransactionalReturn.COMMIT_OK;
+ }
+ retry = false;
}
}
catch(UnknownTransactionException ute) {
@@ -641,8 +720,6 @@ public class TransactionManager {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -742,8 +819,6 @@ public class TransactionManager {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -789,6 +864,7 @@ public class TransactionManager {
switch (commitStatus) {
case TransactionalReturn.COMMIT_OK:
break;
+ case TransactionalReturn.COMMIT_RESEND:
case TransactionalReturn.COMMIT_OK_READ_ONLY:
transactionState.addRegionToIgnore(location); // No need to doCommit for read-onlys
readOnly = true;
@@ -870,39 +946,45 @@ public class TransactionManager {
throw new Exception(msg);
}
- if(result.size() != 1) {
- LOG.error("doAbortX, received incorrect result size: " + result.size());
+ if(result.size() == 0) {
+ LOG.error("doAbortX, received 0 region results.");
refresh = true;
retry = true;
- }
- else {
- for (AbortTransactionResponse cresponse : result.values()) {
- if(cresponse.getHasException()) {
- String exceptionString = cresponse.getException().toString();
- LOG.error("Abort HasException true: " + exceptionString);
- if(exceptionString.contains("UnknownTransactionException")) {
- throw new UnknownTransactionException();
- }
- throw new Exception(cresponse.getException());
- }
}
- retry = false;
- }
+ else {
+ for (AbortTransactionResponse cresponse : result.values()) {
+ if(cresponse.getHasException()) {
+ String exceptionString = cresponse.getException().toString();
+ LOG.error("Abort HasException true: " + exceptionString);
+ if(exceptionString.contains("UnknownTransactionException")) {
+ throw new UnknownTransactionException();
+ }
+ throw new Exception(cresponse.getException());
+ }
+ }
+ retry = false;
}
+ }
catch (UnknownTransactionException ute) {
LOG.debug("UnknownTransactionException in doAbortX for transaction: " + transactionId + "(ignoring): " + ute);
}
catch (Exception e) {
- LOG.error("doAbortX retrying due to Exception: " + e );
- refresh = true;
- retry = true;
+ if(e.toString().contains("Asked to commit a non-pending transaction")) {
+ LOG.error("doCommitX will not retry: " + e);
+ refresh = false;
+ retry = false;
+ }
+ else {
+ LOG.error("doAbortX retrying due to Exception: " + e );
+ refresh = true;
+ retry = true;
+ }
+
}
if (refresh) {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -998,8 +1080,6 @@ public class TransactionManager {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -1098,8 +1178,6 @@ public class TransactionManager {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doCommitX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -1179,8 +1257,6 @@ public class TransactionManager {
if (refresh) {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doPrepareX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
@@ -1309,8 +1385,6 @@ public class TransactionManager {
if (refresh) {
HRegionLocation lv_hrl = table.getRegionLocation(startKey);
HRegionInfo lv_hri = lv_hrl.getRegionInfo();
- String lv_node = lv_hrl.getHostname();
- int lv_length = lv_node.indexOf('.');
if (LOG.isTraceEnabled()) LOG.trace("doAbortX -- location being refreshed : " + location.getRegionInfo().getRegionNameAsString() + "endKey: "
+ Hex.encodeHexString(location.getRegionInfo().getEndKey()) + " for transaction: " + transactionId);
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
index 45cf8c9..53c4951 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
@@ -88,6 +88,7 @@ public class TransactionState {
private boolean ddlTrans;
private static boolean useConcurrentHM = false;
private static boolean getCHMVariable = true;
+ private boolean hasRetried = false;
public Set<String> tableNames = Collections.synchronizedSet(new HashSet<String>());
public Set<TransactionRegionLocation> participatingRegions;
@@ -448,4 +449,13 @@ public class TransactionState {
public void setDDLTx(final boolean status) {
this.ddlTrans = status;
}
+
+ public void setRetried(boolean val) {
+ this.hasRetried = val;
+ }
+
+ public boolean hasRetried() {
+ return this.hasRetried;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java
index e3d3650..3bb2ea4 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalReturn.java
@@ -34,4 +34,5 @@ public interface TransactionalReturn {
final int COMMIT_UNSUCCESSFUL = 4;
/** Status code representing a transaction that cannot be committed due to conflict. */
final int COMMIT_CONFLICT = 5;
+ final int COMMIT_RESEND = 6;
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
index 5c67129..419664b 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
@@ -44,12 +44,16 @@
package org.apache.hadoop.hbase.client.transactional;
+import java.io.File;
+import java.util.Collection;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -61,6 +65,7 @@ import org.apache.commons.codec.binary.Hex;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -75,6 +80,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRequest;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteResponse;
import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndPutRequest;
@@ -99,6 +105,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.regionserver.transactional.SingleVersionDeleteNotSupported;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.fs.Path;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
@@ -113,6 +120,9 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
static private HConnection connection = null;
static Configuration config = HBaseConfiguration.create();
static ExecutorService threadPool;
+ static int retries = 15;
+ static int delay = 1000;
+ private String retryErrMsg = "Coprocessor result is null, retries exhausted";
static {
config.set("hbase.hregion.impl", "org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion");
@@ -194,27 +204,48 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
}
};
- Map<byte[], GetTransactionalResponse> result = null;
+ GetTransactionalResponse result = null;
try {
- result = super.coprocessorService(TrxRegionService.class, get.getRow(), get.getRow(), callable);
+ int retryCount = 0;
+ boolean retry = false;
+ do {
+ Iterator<Map.Entry<byte[], TrxRegionProtos.GetTransactionalResponse>> it = super.coprocessorService(TrxRegionService.class,
+ get.getRow(),
+ get.getRow(),
+ callable)
+ .entrySet().iterator();
+ if(it.hasNext()) {
+ result = it.next().getValue();
+ retry = false;
+ }
+
+ if(result == null || result.getException().contains("closing region")) {
+ Thread.sleep(TransactionalTable.delay);
+ retry = true;
+ transactionState.setRetried(true);
+ retryCount++;
+ }
+ } while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
- Collection<GetTransactionalResponse> results = result.values();
+ //Collection<GetTransactionalResponse> results = result.values();
// Should only be one result, if more than one. Can't handle.
// Need to test whether '!=' or '>' is correct
- if (LOG.isTraceEnabled()) LOG.trace("Results count: " + results.size());
+ //if (LOG.isTraceEnabled()) LOG.trace("Results count: " + results.size());
//if(results.size() != 1)
// throw new IOException("Incorrect number of results from coprocessor call");
- GetTransactionalResponse[] resultArray = new GetTransactionalResponse[results.size()];
- results.toArray(resultArray);
- if(resultArray.length == 0)
- throw new IOException("Problem with calling coprocessor, no regions returned result");
+ //GetTransactionalResponse[] resultArray = new GetTransactionalResponse[results.size()];
+ //results.toArray(resultArray);
+ //if(resultArray.length == 0)
+ // throw new IOException("Problem with calling coprocessor, no regions returned result");
- if(resultArray[0].hasException())
- throw new IOException(resultArray[0].getException());
- return ProtobufUtil.toResult(resultArray[0].getResult());
+ if(result == null)
+ throw new IOException(retryErrMsg);
+ else if(result.hasException())
+ throw new IOException(result.getException());
+ return ProtobufUtil.toResult(result.getResult());
}
/**
@@ -254,10 +285,28 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
};
byte[] row = delete.getRow();
- Map<byte[], DeleteTransactionalResponse> result = null;
+ DeleteTransactionalResponse result = null;
try {
- result = super.coprocessorService(TrxRegionService.class, row, row, callable);
-
+ int retryCount = 0;
+ boolean retry = false;
+ do {
+ Iterator<Map.Entry<byte[], DeleteTransactionalResponse>> it = super.coprocessorService(TrxRegionService.class,
+ row,
+ row,
+ callable)
+ .entrySet().iterator();
+ if(it.hasNext()) {
+ result = it.next().getValue();
+ retry = false;
+ }
+
+ if(result == null || result.getException().contains("closing region")) {
+ Thread.sleep(TransactionalTable.delay);
+ retry = true;
+ transactionState.setRetried(true);
+ retryCount++;
+ }
+ } while (retryCount < TransactionalTable.retries && retry == true);
} catch (ServiceException e) {
e.printStackTrace();
throw new IOException();
@@ -265,16 +314,11 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
t.printStackTrace();
throw new IOException();
}
- Collection<DeleteTransactionalResponse> results = result.values();
- //GetTransactionalResponse[] resultArray = (GetTransactionalResponse[]) results.toArray();
- DeleteTransactionalResponse[] resultArray = new DeleteTransactionalResponse[results.size()];
- results.toArray(resultArray);
-
- if(resultArray.length == 0)
- throw new IOException("Problem with calling coprocessor, no regions returned result");
- if(resultArray[0].hasException())
- throw new IOException(resultArray[0].getException());
+ if(result == null)
+ throw new IOException(retryErrMsg);
+ else if(result.hasException())
+ throw new IOException(result.getException());
}
/**
@@ -318,21 +362,37 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
return rpcCallback.get();
}
};
- Map<byte[], PutTransactionalResponse> result = null;
+ PutTransactionalResponse result = null;
try {
- result = super.coprocessorService(TrxRegionService.class, put.getRow(), put.getRow(), callable);
+ int retryCount = 0;
+ boolean retry = false;
+ do {
+ Iterator<Map.Entry<byte[], PutTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class,
+ put.getRow(),
+ put.getRow(),
+ callable)
+ .entrySet().iterator();
+ if(it.hasNext()) {
+ result = it.next().getValue();
+ retry = false;
+ }
+
+ if(result == null || result.getException().contains("closing region")) {
+ Thread.sleep(TransactionalTable.delay);
+ retry = true;
+ transactionState.setRetried(true);
+ retryCount++;
+ }
+
+ } while(retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
- Collection<PutTransactionalResponse> results = result.values();
- PutTransactionalResponse[] resultArray = new PutTransactionalResponse[results.size()];
- results.toArray(resultArray);
- if(resultArray.length == 0)
- throw new IOException("Problem with calling coprocessor, no regions returned result");
-
- if(resultArray[0].hasException())
- throw new IOException(resultArray[0].getException());
+ if(result == null)
+ throw new IOException(retryErrMsg);
+ else if(result.hasException())
+ throw new IOException(result.getException());
// put is void, may not need to check result
if (LOG.isTraceEnabled()) LOG.trace("TransactionalTable.put EXIT");
@@ -394,22 +454,38 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
}
};
- Map<byte[], CheckAndDeleteResponse> result = null;
+ CheckAndDeleteResponse result = null;
try {
- result = super.coprocessorService(TrxRegionService.class, delete.getRow(), delete.getRow(), callable);
+ int retryCount = 0;
+ boolean retry = false;
+ do {
+ Iterator<Map.Entry<byte[], CheckAndDeleteResponse>> it = super.coprocessorService(TrxRegionService.class,
+ delete.getRow(),
+ delete.getRow(),
+ callable)
+ .entrySet()
+ .iterator();
+ if(it.hasNext()) {
+ result = it.next().getValue();
+ retry = false;
+ }
+
+ if(result == null || result.getException().contains("closing region")) {
+ Thread.sleep(TransactionalTable.delay);
+ retry = true;
+ transactionState.setRetried(true);
+ retryCount++;
+ }
+ } while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
-
- Collection<CheckAndDeleteResponse> results = result.values();
-
- if(results.size() == 0)
- throw new IOException("Problem with calling coprocessor, no regions returned result");
- CheckAndDeleteResponse response = results.iterator().next();
- if(response.hasException())
- throw new IOException(response.getException());
- return response.getResult();
+ if(result == null)
+ throw new IOException(retryErrMsg);
+ else if(result.hasException())
+ throw new IOException(result.getException());
+ return result.getResult();
}
public boolean checkAndPut(final TransactionState transactionState,
@@ -460,23 +536,42 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
}
};
- Map<byte[], CheckAndPutResponse> result = null;
+ CheckAndPutResponse result = null;
try {
- result = super.coprocessorService(TrxRegionService.class, put.getRow(), put.getRow(), callable);
+ int retryCount = 0;
+ boolean retry = false;
+ do {
+ Iterator<Map.Entry<byte[], CheckAndPutResponse>> it = super.coprocessorService(TrxRegionService.class,
+ put.getRow(),
+ put.getRow(),
+ callable)
+ .entrySet()
+ .iterator();
+ if(it.hasNext()) {
+ result = it.next().getValue();
+ retry = false;
+ }
+
+ if(result == null || result.getException().contains("closing region")) {
+ Thread.sleep(TransactionalTable.delay);
+ retry = true;
+ transactionState.setRetried(true);
+ retryCount++;
+ }
+ } while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
- //sw.toString();
throw new IOException("ERROR while calling coprocessor " + sw.toString());
}
- Collection<CheckAndPutResponse> results = result.values();
- if(results.size() == 0)
- throw new IOException("Problem with calling coprocessor, no regions returned result");
- CheckAndPutResponse response = results.iterator().next();
- if(response.hasException())
- throw new IOException(response.getException());
- return response.getResult();
+
+ if(result == null)
+ throw new IOException(retryErrMsg);
+ else if(result.hasException())
+ throw new IOException(result.getException());
+
+ return result.getResult();
}
/**
@@ -541,28 +636,38 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
}
};
- Map<byte[], DeleteMultipleTransactionalResponse> result = null;
+ DeleteMultipleTransactionalResponse result = null;
try {
- result = super.coprocessorService(TrxRegionService.class,
- entry.getValue().get(0).getRow(),
- entry.getValue().get(0).getRow(),
- callable);
+ int retryCount = 0;
+ boolean retry = false;
+ do {
+ Iterator<Map.Entry<byte[], DeleteMultipleTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class,
+ entry.getValue().get(0).getRow(),
+ entry.getValue().get(0).getRow(),
+ callable)
+ .entrySet().iterator();
+ if(it.hasNext()) {
+ result = it.next().getValue();
+ retry = false;
+ }
+
+ if(result == null || result.getException().contains("closing region")) {
+ Thread.sleep(TransactionalTable.delay);
+ retry = true;
+ transactionState.setRetried(true);
+ retryCount++;
+ }
+ } while (retryCount < TransactionalTable.retries && retry == true);
+
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
- if(result.size() > 1) {
- LOG.error("result size for multiple delete:" + result.size());
- throw new IOException("Incorrect number of region results");
- }
- Collection<DeleteMultipleTransactionalResponse> results = result.values();
- DeleteMultipleTransactionalResponse[] resultArray = new DeleteMultipleTransactionalResponse[results.size()];
- results.toArray(resultArray);
- if(resultArray.length == 0)
- throw new IOException("Problem with calling coprocessor, no regions returned result");
-
- if (resultArray[0].hasException())
- throw new IOException(resultArray[0].getException());
+
+ if(result == null)
+ throw new IOException(retryErrMsg);
+ else if (result.hasException())
+ throw new IOException(result.getException());
}
}
@@ -627,24 +732,36 @@ public class TransactionalTable extends HTable implements TransactionalTableClie
return rpcCallback.get();
}
};
- Map<byte[], PutMultipleTransactionalResponse> result = null;
+ PutMultipleTransactionalResponse result = null;
try {
- result = super.coprocessorService(TrxRegionService.class,
- entry.getValue().get(0).getRow(),
- entry.getValue().get(0).getRow(),
- callable);
+ int retryCount = 0;
+ boolean retry = false;
+ do {
+ Iterator<Map.Entry<byte[], PutMultipleTransactionalResponse>> it= super.coprocessorService(TrxRegionService.class,
+ entry.getValue().get(0).getRow(),
+ entry.getValue().get(0).getRow(),
+ callable)
+ .entrySet().iterator();
+ if(it.hasNext()) {
+ result = it.next().getValue();
+ retry = false;
+ }
+
+ if(result == null || result.getException().contains("closing region")) {
+ Thread.sleep(TransactionalTable.delay);
+ retry = true;
+ transactionState.setRetried(true);
+ retryCount++;
+ }
+ } while (retryCount < TransactionalTable.retries && retry == true);
} catch (Throwable e) {
e.printStackTrace();
throw new IOException("ERROR while calling coprocessor");
}
- Collection<PutMultipleTransactionalResponse> results = result.values();
- PutMultipleTransactionalResponse[] resultArray = new PutMultipleTransactionalResponse[results.size()];
- results.toArray(resultArray);
- if(resultArray.length == 0)
- throw new IOException("Problem with calling coprocessor, no regions returned result");
-
- if (resultArray[0].hasException())
- throw new IOException(resultArray[0].getException());
+ if(result == null)
+ throw new IOException(retryErrMsg);
+ else if (result.hasException())
+ throw new IOException(result.getException());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
new file mode 100644
index 0000000..1107bdc
--- /dev/null
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/SplitBalanceHelper.java
@@ -0,0 +1,296 @@
+package org.apache.hadoop.hbase.coprocessor.transactional;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionScannerHolder;
+import org.apache.hadoop.hbase.regionserver.transactional.TrxTransactionState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+public class SplitBalanceHelper {
+ private static final Log LOG = LogFactory.getLog(SplitBalanceHelper.class);
+
+ private Path flushPath;
+
+ private static String zkTable = "/hbase/table";
+ private static String zSplitBalPath = TrxRegionObserver.zTrafPath + "splitbalance/";
+ private static String zSplitBalPathNoSlash = TrxRegionObserver.zTrafPath + "splitbalance";
+ private static String SPLIT = "SPLIT";
+ private static String BALANCE = "BALANCE";
+ private static final String FLUSH_PATH = "traf.txn.out";
+ private static AtomicBoolean needsCleanup = new AtomicBoolean(true);
+ private String balancePath;
+ private String splitPath;
+ private String regionPath;
+
+ private ZooKeeperWatcher zkw;
+ private HRegionInfo hri;
+ private HRegion region;
+ private String tablename;
+
+
+ public SplitBalanceHelper(HRegion my_Region, ZooKeeperWatcher zkw) {
+
+ String fileName = FLUSH_PATH + getTimeStamp();
+ this.region = my_Region;
+ this.hri = my_Region.getRegionInfo();
+ this.zkw = zkw;
+ this.tablename = my_Region.getTableDesc().getNameAsString();
+ try {
+ if (ZKUtil.checkExists(zkw, zSplitBalPathNoSlash) == -1)
+ ZKUtil.createWithParents(zkw, zSplitBalPathNoSlash);
+ } catch (KeeperException ke) {
+ LOG.error("ERROR: Zookeeper exception: "+ ke);
+ }
+ this.flushPath = new Path(region.getRegionFileSystem().getRegionDir(), fileName);
+ regionPath = zSplitBalPath + this.tablename + "/" + hri.getEncodedName();
+ balancePath = regionPath + "/" + BALANCE + "/";
+ splitPath = regionPath + "/" + SPLIT + "/";
+
+ if (SplitBalanceHelper.needsCleanup.compareAndSet(true, false)) {
+ zkCleanup();
+ }
+ }
+
+ public Path getPath() {
+ return flushPath;
+ }
+
+ public boolean getSplit() {
+ return getSplit(null);
+ }
+
+ public boolean getSplit(StringBuilder path) {
+ try {
+ byte [] splPath = ZKUtil.getData(zkw, splitPath.substring(0,splitPath.length()-1));
+ if (splPath == null) {
+ return false;
+ }
+ else{
+ if (path != null)
+ path.append(splPath.toString());
+ if(LOG.isDebugEnabled()) LOG.debug("Split information retrieved, path is: " + splPath.toString());
+ return true;
+ }
+ } catch (Exception e) {
+ if(LOG.isErrorEnabled()) LOG.error("Keeper exception: " + e);
+ return false;
+ }
+ }
+
+ public void setSplit(HRegion leftRegion, HRegion rightRegion) throws IOException {
+ String zLeftKey = zSplitBalPath + leftRegion.getRegionInfo().getEncodedName();
+ String zRightKey = zSplitBalPath + rightRegion.getRegionInfo().getEncodedName();
+
+ try {
+ if(ZKUtil.checkExists(zkw, balancePath.substring(0,balancePath.length()-1)) != -1) {
+ clearBalance();
+ }
+ if (ZKUtil.checkExists(zkw, zLeftKey) == -1)
+ ZKUtil.createWithParents(zkw, zLeftKey);
+ if (ZKUtil.checkExists(zkw, zRightKey) == -1)
+ ZKUtil.createWithParents(zkw, zRightKey);
+
+ ZKUtil.createAndFailSilent(zkw, zLeftKey + "/" + SPLIT, Bytes.toBytes(flushPath.toString()));
+ ZKUtil.createAndFailSilent(zkw, zRightKey + "/" + SPLIT, Bytes.toBytes(flushPath.toString()));
+ if(LOG.isDebugEnabled()) LOG.debug("Split coordination node written for " + leftRegion.getRegionNameAsString() +
+ " and " + rightRegion.getRegionNameAsString());
+
+ } catch (KeeperException ke) {
+ LOG.error("ERROR: Zookeeper exception: "+ ke);
+ }
+ }
+
+ public void setSplit() {
+
+ try {
+ if (ZKUtil.checkExists(zkw, balancePath.substring(0, balancePath.length()-1)) != -1) {
+ clearBalance();
+ }
+ if (ZKUtil.checkExists(zkw, splitPath.substring(0, splitPath.length()-1)) == -1) {
+ ZKUtil.createWithParents(zkw, splitPath.substring(0, splitPath.length()-1));
+ }
+
+ ZKUtil.createSetData(zkw, splitPath.substring(0, splitPath.length()-1), Bytes.toBytes(flushPath.toString()));
+ if(LOG.isDebugEnabled()) LOG.debug("Setting split coordination node for " + hri.getRegionNameAsString());
+
+ } catch (KeeperException ke) {
+ LOG.error("ERROR: Zookeeper exception: "+ ke);
+ }
+ }
+
+ public void clearSplit() {
+ if(LOG.isTraceEnabled()) LOG.trace("clearSplit called for region: " + this.hri.getRegionNameAsString());
+ try {
+ ZKUtil.deleteNodeRecursively(zkw, regionPath);
+ } catch (KeeperException ke) {
+ LOG.error("Zookeeper exception: " + ke);
+ }
+ }
+
+ public boolean getBalance(StringBuilder path) {
+ try {
+
+ byte [] balPath = ZKUtil.getData(zkw, balancePath.substring(0, balancePath.length()-1));
+ if (balPath == null)
+ return false;
+ else{
+ path.append(new String(balPath));
+ if(LOG.isDebugEnabled()) LOG.debug("Balance information retrieved, path is: " + new String(balPath));
+ return true;
+ }
+ } catch (Exception e) {
+ if(LOG.isErrorEnabled()) LOG.error("Keeper exception: " + e);
+ }
+ return true;
+ }
+
+ public void setBalance() throws IOException {
+
+ try {
+ if (ZKUtil.checkExists(zkw, splitPath.substring(0, splitPath.length()-1)) != -1) {
+ throw new IOException("SPLIT node already exists when trying to add BALANCE node");
+ }
+
+ if (ZKUtil.checkExists(zkw, balancePath.substring(0, balancePath.length()-1)) == -1) {
+ ZKUtil.createWithParents(zkw, balancePath.substring(0, balancePath.length()-1));
+ }
+ ZKUtil.createSetData(zkw, balancePath.substring(0, balancePath.length()-1), Bytes.toBytes(flushPath.toString()));
+ if(LOG.isDebugEnabled()) LOG.debug("Setting balance coordination node for " + hri.getRegionNameAsString());
+
+ } catch (KeeperException ke) {
+ LOG.error("ERROR: Zookeeper exception: "+ ke);
+ }
+ }
+
+ public void clearBalance() {
+ if(LOG.isTraceEnabled()) LOG.trace("clearBalance called for region: " + this.hri.getRegionNameAsString());
+ try {
+ ZKUtil.deleteNodeRecursively(zkw, regionPath);
+ } catch (KeeperException ke) {
+ LOG.error("Zookeeper exception: " + ke);
+ }
+ }
+
+ private long getTimeStamp() {
+ return System.currentTimeMillis();
+ }
+
+ protected boolean pendingListClear(Set<TrxTransactionState> commitPendingTransactions) throws IOException {
+ return commitPendingTransactions.isEmpty();
+ }
+
+ protected boolean scannersListClear(ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scanners) throws IOException {
+ return scanners.isEmpty();
+ }
+
+ protected void pendingWait(Set<TrxTransactionState> commitPendingTransactions, int pendingDelayLen) throws IOException {
+ int count = 1;
+ while(!pendingListClear(commitPendingTransactions)) {
+ try {
+ if(LOG.isDebugEnabled()) LOG.debug("pendingWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+ Thread.sleep(pendingDelayLen);
+ } catch(InterruptedException e) {
+ String error = "Problem while calling sleep() on pendingWait delay, " + e;
+ if(LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on preSplit delay, returning. " + e);
+ throw new IOException(error);
+ }
+ }
+ }
+
+ protected void scannersWait(ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scanners, int pendingDelayLen)
+ throws IOException {
+ int count = 1;
+ while(!scannersListClear(scanners)) {
+ try {
+ if(LOG.isDebugEnabled()) LOG.debug("scannersWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+ Thread.sleep(pendingDelayLen);
+ } catch(InterruptedException e) {
+ String error = "Problem while calling sleep() on scannersWait delay, " + e;
+ if(LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on preSplit delay, returning. " + e);
+ throw new IOException(error);
+ }
+ }
+ }
+
+ protected void pendingAndScannersWait(Set<TrxTransactionState> commitPendingTransactions,
+ ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scanners,
+ int pendingDelayLen) throws IOException {
+ int count = 1;
+ while(!scannersListClear(scanners) && !pendingListClear(commitPendingTransactions)) {
+ try {
+ if(LOG.isDebugEnabled()) LOG.debug("pendingAndScannersWait() delay, count " + count++ + " on: " + hri.getRegionNameAsString());
+ Thread.sleep(pendingDelayLen);
+ } catch(InterruptedException e) {
+ String error = "Problem while calling sleep() on pendingAndScannersWait delay, " + e;
+ if(LOG.isErrorEnabled()) LOG.error("Problem while calling sleep() on pendingAndScannersWait delay, returning. " + e);
+ throw new IOException(error);
+ }
+ }
+ }
+
+ protected void activeWait(ConcurrentHashMap<String, TrxTransactionState> transactionsById, int activeDelayLen, int splitDelayLimit) throws IOException {
+ int counter = 0;
+ int minutes = 0;
+ int currentMin = 0;
+
+ boolean delayMsg = false;
+ while(!transactionsById.isEmpty()) {
+ try {
+ delayMsg = true;
+ Thread.sleep(activeDelayLen);
+ counter++;
+ currentMin = (counter * activeDelayLen) / 60000;
+
+ if(currentMin > minutes) {
+ minutes = currentMin;
+ if (LOG.isInfoEnabled()) LOG.info("Delaying split due to transactions present. Delayed : " +
+ minutes + " minute(s) on " + hri.getRegionNameAsString());
+ }
+ if(minutes >= splitDelayLimit) {
+ if(LOG.isWarnEnabled()) LOG.warn("Surpassed split delay limit of " + splitDelayLimit
+ + " minutes. Continuing with split");
+ delayMsg = false;
+ break;
+ }
+ } catch (InterruptedException e) {
+ String error = "Problem while calling sleep() on preSplit delay - activeWait: " + e;
+ if(LOG.isErrorEnabled()) LOG.error(error);
+ throw new IOException(error);
+ }
+ }
+ if(delayMsg) {
+ if(LOG.isWarnEnabled()) LOG.warn("Continuing with split operation, no active transactions on: " + hri.getRegionNameAsString());
+ }
+ }
+
+ protected void zkCleanup() {
+ if(LOG.isTraceEnabled()) LOG.trace("zkCleanup -- ENTRY");
+ try {
+ List<String> trafTables = ZKUtil.listChildrenNoWatch(zkw, zSplitBalPathNoSlash);
+ List<String> hbaseTables = ZKUtil.listChildrenNoWatch(zkw, SplitBalanceHelper.zkTable);
+
+ for(String tableName : trafTables) {
+ if(!hbaseTables.contains(tableName)) {
+ if(LOG.isTraceEnabled()) LOG.trace("zkCleanup, removing " + zSplitBalPath + tableName);
+ ZKUtil.deleteNodeRecursively(zkw, zSplitBalPath + tableName);
+ }
+ }
+ } catch(KeeperException ke) {
+ if(LOG.isErrorEnabled()) LOG.error("zkCleanup error, please check your ZooKeeper: " + ke);
+ }
+ if(LOG.isTraceEnabled()) LOG.trace("zkCleanup -- EXIT");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/65d0f1cf/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
index e426159..2e64c12 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/coprocessor/transactional/TrxRegionEndpoint.java
@@ -91,6 +91,18 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ProtoUtil;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionPersist;
+import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.TransactionStateMsg;
+import com.google.protobuf.CodedInputStream;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -136,10 +148,12 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Mutation;
@@ -268,7 +282,7 @@ CoprocessorService, Coprocessor {
// Concurrent map for transactional region scanner holders
// Protected by synchronized methods
- final ConcurrentHashMap<Long,
+ private ConcurrentHashMap<Long,
TransactionalRegionScannerHolder> scanners =
new ConcurrentHashMap<Long, TransactionalRegionScannerHolder>();
@@ -341,6 +355,9 @@ CoprocessorService, Coprocessor {
String lv_hostName;
int lv_port;
private static String zNodePath = "/hbase/Trafodion/recovery/";
+ private static final String COMMITTED_TXNS_KEY = "1_COMMITED_TXNS_KEY";
+ private static final String TXNS_BY_ID_KEY = "2_TXNS_BY_ID_KEY";
+ private HFileContext context = new HFileContextBuilder().withIncludesTags(false).build();
private static final int MINIMUM_LEASE_TIME = 7200 * 1000;
private static final int LEASE_CHECK_FREQUENCY = 1000;
@@ -367,12 +384,14 @@ CoprocessorService, Coprocessor {
private static float memoryPercentage = 0;
private static boolean memoryThrottle = false;
private static boolean suppressOutOfOrderProtocolException = DEFAULT_SUPPRESS_OOP;
+ private Configuration config;
// Transaction state defines
private static final int COMMIT_OK = 1;
private static final int COMMIT_OK_READ_ONLY = 2;
private static final int COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR = 3;
private static final int COMMIT_CONFLICT = 5;
+ private static final int COMMIT_RESEND = 6;
private static final int CLOSE_WAIT_ON_COMMIT_PENDING = 1000;
private static final int MAX_COMMIT_PENDING_WAITS = 10;
@@ -392,7 +411,7 @@ CoprocessorService, Coprocessor {
public static final String trxkeyEPCPinstance = "EPCPinstance";
// TBD Maybe we should just use HashMap to improve the performance, ConcurrentHashMap could be too strict
- static ConcurrentHashMap<String, Object> transactionsEPCPMap = new ConcurrentHashMap<String, Object>();
+ static ConcurrentHashMap<String, Object> transactionsEPCPMap;
// TrxRegionService methods
@@ -839,9 +858,11 @@ CoprocessorService, Coprocessor {
} catch (UnknownTransactionException u) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught UnknownTransactionException after internal commitRequest call - " + u.toString());
ute = u;
+ status = COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
} catch (IOException e) {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest - txId " + transactionId + ", Caught IOException after internal commitRequest call - "+ e.toString());
ioe = e;
+ status = COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
}
}
@@ -2851,6 +2872,13 @@ CoprocessorService, Coprocessor {
return this;
}
+ static public ConcurrentHashMap<String, Object> getRegionMap() {
+ if (transactionsEPCPMap == null) {
+ transactionsEPCPMap = new ConcurrentHashMap<String, Object>();
+ }
+ return transactionsEPCPMap;
+ }
+
/**
* Stores a reference to the coprocessor environment provided by the
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost}
@@ -2864,6 +2892,9 @@ CoprocessorService, Coprocessor {
*/
@Override
public void start(CoprocessorEnvironment env) throws IOException {
+ if (transactionsEPCPMap == null)
+ transactionsEPCPMap = new ConcurrentHashMap<String, Object>();
+
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
@@ -2878,30 +2909,30 @@ CoprocessorService, Coprocessor {
this.t_Region = (TransactionalRegion) tmp_env.getRegion();
this.fs = this.m_Region.getFilesystem();
- org.apache.hadoop.conf.Configuration conf = tmp_env.getConfiguration();
+ this.config = tmp_env.getConfiguration();
synchronized (stoppableLock) {
try {
- this.transactionLeaseTimeout = conf.getInt(LEASE_CONF, MINIMUM_LEASE_TIME);
+ this.transactionLeaseTimeout = config.getInt(LEASE_CONF, MINIMUM_LEASE_TIME);
if (this.transactionLeaseTimeout < MINIMUM_LEASE_TIME) {
if (LOG.isWarnEnabled()) LOG.warn("Transaction lease time: " + this.transactionLeaseTimeout + ", was less than the minimum lease time. Now setting the timeout to the minimum default value: " + MINIMUM_LEASE_TIME);
this.transactionLeaseTimeout = MINIMUM_LEASE_TIME;
}
- this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(conf,
+ this.scannerLeaseTimeoutPeriod = HBaseConfiguration.getInt(config,
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
- this.scannerThreadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
+ this.scannerThreadWakeFrequency = config.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
- this.cleanTimer = conf.getInt(SLEEP_CONF, DEFAULT_SLEEP);
- this.memoryUsageThreshold = conf.getInt(MEMORY_THRESHOLD, DEFAULT_MEMORY_THRESHOLD);
- this.memoryUsagePerformGC = conf.getBoolean(MEMORY_PERFORM_GC, DEFAULT_MEMORY_PERFORM_GC);
- this.memoryUsageWarnOnly = conf.getBoolean(MEMORY_WARN_ONLY, DEFAULT_MEMORY_WARN_ONLY);
- this.memoryUsageTimer = conf.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
- this.memoryUsageTimer = conf.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
+ this.cleanTimer = config.getInt(SLEEP_CONF, DEFAULT_SLEEP);
+ this.memoryUsageThreshold = config.getInt(MEMORY_THRESHOLD, DEFAULT_MEMORY_THRESHOLD);
+ this.memoryUsagePerformGC = config.getBoolean(MEMORY_PERFORM_GC, DEFAULT_MEMORY_PERFORM_GC);
+ this.memoryUsageWarnOnly = config.getBoolean(MEMORY_WARN_ONLY, DEFAULT_MEMORY_WARN_ONLY);
+ this.memoryUsageTimer = config.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
+ this.memoryUsageTimer = config.getInt(MEMORY_CONF, DEFAULT_MEMORY_SLEEP);
- this.suppressOutOfOrderProtocolException = conf.getBoolean(SUPPRESS_OOP, DEFAULT_SUPPRESS_OOP);
+ this.suppressOutOfOrderProtocolException = config.getBoolean(SUPPRESS_OOP, DEFAULT_SUPPRESS_OOP);
if (this.transactionLeases == null)
this.transactionLeases = new Leases(LEASE_CHECK_FREQUENCY);
@@ -3076,6 +3107,16 @@ CoprocessorService, Coprocessor {
transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyClosingVar,
this.closing);
}
+ ConcurrentHashMap<Long,TransactionalRegionScannerHolder> scannersCheck =
+ (ConcurrentHashMap<Long,TransactionalRegionScannerHolder>)transactionsByIdTestz
+ .get(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyScanners);
+ if(scannersCheck != null) {
+ this.scanners = scannersCheck;
+ }
+ else {
+ transactionsByIdTestz.put(this.m_Region.getRegionNameAsString()+TrxRegionObserver.trxkeyScanners,
+ this.scanners);
+ }
// Set up the memoryBean from the ManagementFactory
if (memoryUsageThreshold < DEFAULT_MEMORY_THRESHOLD)
@@ -4059,7 +4100,6 @@ CoprocessorService, Coprocessor {
throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: beginTransaction -- ENTRY txId: " + transactionId);
- checkClosing(transactionId);
// TBD until integration with recovery
if (reconstructIndoubts == 0) {
@@ -4174,6 +4214,7 @@ CoprocessorService, Coprocessor {
if (LOG.isTraceEnabled()) LOG.trace("Enter TrxRegionEndpoint coprocessor: beginTransIfNotExist, txid: "
+ transactionId + " transactionsById size: "
+ transactionsById.size());
+ checkClosing(transactionId);
String key = getTransactionalUniqueId(transactionId);
synchronized (transactionsById) {
@@ -4303,8 +4344,10 @@ CoprocessorService, Coprocessor {
}
// may change to indicate a NOTFOUND case then depends on the TM ts state, if reinstated tx, ignore the exception
if (state == null) {
- if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitRequest encountered unknown transactionID txId: " + transactionId + " returning COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR");
- return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
+ String errMsg = "TrxRegionEndpoint coprocessor: commitRequest encountered unknown transactionID txId: " + transactionId;
+ if (LOG.isTraceEnabled()) LOG.trace(errMsg);
+ //return COMMIT_UNSUCCESSFUL_FROM_COPROCESSOR;
+ throw new UnknownTransactionException(errMsg);
}
if (LOG.isInfoEnabled())
@@ -4503,6 +4546,8 @@ CoprocessorService, Coprocessor {
// Otherwise we were read-only and commitable, so we can forget it.
state.setStatus(Status.COMMITED);
+ if(state.getSplitRetry())
+ return COMMIT_RESEND;
retireTransaction(state, true);
if (LOG.isDebugEnabled()) LOG.debug("TrxRegionEndpoint coprocessor: commitRequest READ ONLY -- EXIT txId: " + transactionId);
return COMMIT_OK_READ_ONLY;
@@ -4673,7 +4718,7 @@ CoprocessorService, Coprocessor {
if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible -- ENTRY txId: " + transactionId + " COMMIT_OK");
return true;
} catch (Throwable e) {
- LOG.error("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", Caught exception after internal commit call "
+ if (LOG.isTraceEnabled()) LOG.trace("TrxRegionEndpoint coprocessor: commitIfPossible - txId " + transactionId + ", Caught exception after internal commit call "
+ e.getMessage() + " " + stackTraceToString(e));
throw new IOException(e.toString());
}
@@ -5178,6 +5223,199 @@ CoprocessorService, Coprocessor {
}
}
}
+ public void flushToFS(Path flushPath) throws IOException {
+ TransactionPersist.Builder txnPersistBuilder = TransactionPersist.newBuilder();
+ fs.delete(flushPath, true);
+
+ HFileWriterV2 w =
+ (HFileWriterV2)
+ HFile.getWriterFactory(config, new CacheConfig(config))
+ .withPath(fs, flushPath).withFileContext(context).create();
+
+ Map<Long, TrxTransactionState> transactionMap = new HashMap<Long, TrxTransactionState>();
+
+ for(TrxTransactionState ts : transactionsById.values()) {
+ transactionMap.put(ts.getTransactionId(), ts);
+ txnPersistBuilder.addTxById(ts.getTransactionId());
+ }
+ for(Map.Entry<Long, TrxTransactionState> entry :
+ commitedTransactionsBySequenceNumber.entrySet()) {
+ transactionMap.put(entry.getValue().getTransactionId(), entry.getValue());
+ txnPersistBuilder.addSeqNoListSeq(entry.getKey());
+ txnPersistBuilder.addSeqNoListTxn(entry.getValue().getTransactionId());
+ }
+ for(TrxTransactionState ts : transactionMap.values()) {
+ for(TrxTransactionState ts2 : ts.getTransactionsToCheck()) {
+ transactionMap.put(ts.getTransactionId(), ts);
+ }
+ }
+ txnPersistBuilder.setNextSeqId(nextSequenceId.get());
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ for(TrxTransactionState ts : transactionMap.values()) {
+ TransactionStateMsg.Builder tsBuilder = TransactionStateMsg.newBuilder();
+ tsBuilder.setTxId(ts.getTransactionId());
+ tsBuilder.setStartSeqNum(ts.getStartSequenceNumber());
+ tsBuilder.setSeqNum(ts.getHLogStartSequenceId());
+ tsBuilder.setLogSeqId(ts.getLogSeqId());
+ tsBuilder.setReinstated(ts.isReinstated());
+
+ if(ts.getCommitProgress() == null)
+ tsBuilder.setCommitProgress(-1);
+ else
+ tsBuilder.setCommitProgress(ts.getCommitProgress().ordinal());
+
+ tsBuilder.setStatus(ts.getStatus().ordinal());
+ for (WriteAction wa : ts.getWriteOrdering()) {
+ if(wa.getPut() != null) {
+ tsBuilder.addPutOrDel(true);
+ tsBuilder.addPut(ProtobufUtil.toMutation(MutationType.PUT, wa.getPut()));
+ }
+ else {
+ tsBuilder.addPutOrDel(false);
+
+ tsBuilder.addDelete(ProtobufUtil.toMutation(MutationType.DELETE, wa.getDelete()));
+ }
+ }
+ tsBuilder.build().writeDelimitedTo(output);
+ }
+ byte [] firstByte = output.toByteArray();
+
+ w.append(new KeyValue(Bytes.toBytes(COMMITTED_TXNS_KEY), Bytes.toBytes("cf"), Bytes.toBytes("qual"),
+ firstByte));
+
+ byte [] persistByte = txnPersistBuilder.build().toByteArray();
+ TransactionPersist persistMsg = TransactionPersist.parseFrom(persistByte);
+ w.append(new KeyValue(Bytes.toBytes(TXNS_BY_ID_KEY), Bytes.toBytes("cf"), Bytes.toBytes("qual"),
+ persistByte));
+ w.close();
+ }
+
+ public void readTxnInfo(Path flushPath) throws IOException {
+ readTxnInfo(flushPath, false);
+ }
+ public void readTxnInfo(Path flushPath, boolean setRetry) throws IOException {
+ if(LOG.isTraceEnabled()) LOG.trace("readTxnInfo -- ENTRY, Path: " + flushPath.toString());
+
+ try {
+ HFile.Reader reader = HFile.createReader(fs, flushPath, new CacheConfig(config), config);
+ HFileScanner scanner = reader.getScanner(true, false);
+ scanner.seekTo();
+ //KeyValue firstVal = scanner.getKeyValue();
+ Cell firstVal = scanner.getKeyValue();
+ scanner.next();
+ //KeyValue persistKV = scanner.getKeyValue();
+ Cell persistKV = scanner.getKeyValue();
+
+ if(firstVal == null || persistKV == null) {
+ throw new IOException("Invalid values read from HFile in readTxnInfo");
+ }
+
+ Map<Long, TrxTransactionState> txnMap = new HashMap<Long, TrxTransactionState>();
+ Map<Long, List<Long>> txnsToCheckMap = new HashMap<Long, List<Long>>();
+ ByteArrayInputStream input = new ByteArrayInputStream(CellUtil.cloneValue(firstVal));
+
+ TransactionStateMsg tsm = TransactionStateMsg.parseDelimitedFrom(input);
+ while (tsm != null) {
+ TrxTransactionState ts = new TrxTransactionState(tsm.getTxId(),
+ tsm.getSeqNum(),
+ new AtomicLong(tsm.getLogSeqId()),
+ m_Region.getRegionInfo(),
+ m_Region.getTableDesc(),
+ m_Region.getLog(),
+ configuredEarlyLogging);
+ ts.setStartSequenceNumber(tsm.getStartSeqNum());
+
+ List<Boolean> putOrDel = tsm.getPutOrDelList();
+ List<MutationProto> puts = tsm.getPutList();
+ List<MutationProto> deletes = tsm.getDeleteList();
+
+ int putIndex = 0;
+ int deleteIndex = 0;
+ for (Boolean put : putOrDel) {
+ if(put) {
+ Put writePut = ProtobufUtil.toPut(puts.get(putIndex++));
+ if(m_Region.rowIsInRange(regionInfo, writePut.getRow())) {
+ ts.addWrite(writePut);
+ }
+ }
+ else {
+ Delete writeDelete = ProtobufUtil.toDelete(deletes.get(deleteIndex++));
+ if(m_Region.rowIsInRange(regionInfo, writeDelete.getRow())) {
+ ts.addDelete(writeDelete);
+ }
+ }
+ }
+ txnsToCheckMap.put(tsm.getTxId(), tsm.getTxnsToCheckList());
+ if(setRetry)
+ ts.setSplitRetry(true);
+ txnMap.put(ts.getTransactionId(), ts);
+ tsm = TransactionStateMsg.parseDelimitedFrom(input);
+ }
+
+ for(TrxTransactionState ts : txnMap.values()) {
+ for (Long txid : txnsToCheckMap.get(ts.getTransactionId())) {
+ TrxTransactionState mapTS = txnMap.get(txid);
+ if(mapTS != null)
+ ts.addTransactionToCheck(mapTS);
+ }
+ }
+ TransactionPersist txnPersistMsg = TransactionPersist.parseFrom(CellUtil.cloneValue(persistKV));
+
+ if(txnPersistMsg == null) {
+ throw new IOException("Invalid protobuf, message is null.");
+ }
+ for (Long txid : txnPersistMsg.getTxByIdList()) {
+ String key = getTransactionalUniqueId(txid);
+ TrxTransactionState ts = txnMap.get(txid);
+ if (ts != null) {
+ TrxTransactionState existingTs = transactionsById.get(txid);
+ if(existingTs != null) {
+ for(WriteAction wa : existingTs.getWriteOrdering()) {
+ if(wa.getPut() != null) {
+ ts.addWrite(wa.getPut());
+ }
+ else {
+ ts.addDelete(wa.getDelete());
+ }
+ }
+ }
+ transactionsById.put(key, ts);
+ transactionLeases.createLease(key, transactionLeaseTimeout, new TransactionLeaseListener(txid));
+ }
+ else {
+ TrxTransactionState tsEntry = new TrxTransactionState(txid,
+ 0,
+ new AtomicLong(0),
+ m_Region.getRegionInfo(),
+ m_Region.getTableDesc(),
+ m_Region.getLog(),
+ configuredEarlyLogging);
+ transactionsById.putIfAbsent(key, tsEntry);
+ }
+ }
+
+ for (int i = 0; i < txnPersistMsg.getSeqNoListSeqCount(); i++) {
+ TrxTransactionState ts = txnMap.get(txnPersistMsg.getSeqNoListTxn(i));
+ if (ts!=null)
+ commitedTransactionsBySequenceNumber.put(txnPersistMsg.getSeqNoListSeq(i), ts);
+ }
+
+ this.nextSequenceId = new AtomicLong(txnPersistMsg.getNextSeqId());
+ } catch(IOException e) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ e.printStackTrace(pw);
+ LOG.error(sw.toString());
+ }
+ if(LOG.isTraceEnabled()) LOG.trace("readTxnInfo -- EXIT");
+
+ }
+
+ public void setClosing(boolean value) {
+ closing.set(value);
+ }
}
//1}