You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by sa...@apache.org on 2017/05/12 20:25:39 UTC
[04/22] incubator-trafodion git commit: This is a large contribution
of changes from Esgyn TransactionManager and libraries that are collectively
much better tested and hardened than Trafodion,
but are too numerous and complex to cherry pick individually
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java
index 31b35d4..dfd06a9 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/SsccTransactionState.java
@@ -84,7 +84,7 @@ public class SsccTransactionState extends TransactionState{
public SsccTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId,
final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, long SsccSequenceId) {
- super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging, /* region TX */ false);
+ super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging,SsccSequenceId, /* region TX */ false);
setStartId(SsccSequenceId);
if(LOG.isTraceEnabled()) LOG.trace("SsccTransactionState : new state object for transid: " + transactionId + " with sequence: " + SsccSequenceId + " complete");
}
@@ -93,7 +93,7 @@ public class SsccTransactionState extends TransactionState{
final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, long SsccSequenceId,
boolean regionTx) {
- super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging, regionTx);
+ super(transactionId,rLogStartSequenceId,hlogSeqId,regionInfo,htd,hLog,logging,SsccSequenceId,regionTx);
setStartId(SsccSequenceId);
if(LOG.isTraceEnabled()) LOG.trace("SsccTransactionState : new state object for transid: " + transactionId + " with sequence: " + SsccSequenceId + " complete");
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
index a7eae1f..e48062a 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
@@ -72,6 +72,9 @@ public class TransactionState {
protected static final Log LOG = LogFactory.getLog(TransactionState.class);
+ protected long startId_;
+ protected long commitSequenceId;
+
/** Current commit progress */
public enum CommitProgress {
/** Initial status, still performing operations. */
@@ -88,6 +91,10 @@ public class TransactionState {
* Checked if we can commit, and said yes. Still need to determine the global decision.
*/
COMMIT_PENDING,
+ /**
+ * Checked if we can commit, and writeOrdering is empty.
+ */
+ COMMIT_READONLY,
/** Committed. */
COMMITED,
/** Aborted. */
@@ -106,7 +113,10 @@ public class TransactionState {
protected long controlPointEpochAtPrepare = 0;
protected int reInstated = 0;
protected long flushTxId = 0;
+ protected long nodeId;
+ protected long clusterId;
+ protected boolean neverReadOnly = false;
protected boolean splitRetry = false;
protected boolean earlyLogging = false;
protected boolean commit_TS_CC = false;
@@ -124,7 +134,8 @@ public class TransactionState {
public static final byte TS_TRAFODION_TXN_TAG_TYPE = 41;
public TransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId, final HRegionInfo regionInfo,
- HTableDescriptor htd, WAL hLog, boolean logging, boolean isRegionTx) {
+ HTableDescriptor htd, WAL hLog, boolean logging, final long startId,
+ boolean isRegionTx) {
Tag transactionalTag = null;
if (LOG.isTraceEnabled()) LOG.trace("Create TS object for " + transactionId + " early logging " + logging);
this.transactionId = transactionId;
@@ -136,14 +147,18 @@ public class TransactionState {
this.tabledescriptor = htd;
this.earlyLogging = logging;
this.tHLog = hLog;
+ setStartId(startId);
+ setNodeId();
+ setClusterId();
+
if(isRegionTx){ // RegionTX takes precedence
- transactionalTag = this.formTransactionalContextTag(TS_REGION_TX_COMMIT_REQUEST);
+ transactionalTag = this.formTransactionalContextTag(TS_REGION_TX_COMMIT_REQUEST, startId);
}
else if (earlyLogging) {
- transactionalTag = this.formTransactionalContextTag(TS_ACTIVE);
+ transactionalTag = this.formTransactionalContextTag(TS_ACTIVE, startId);
}
else {
- transactionalTag = this.formTransactionalContextTag(TS_COMMIT_REQUEST);
+ transactionalTag = this.formTransactionalContextTag(TS_COMMIT_REQUEST, startId);
}
tagList.add(transactionalTag);
}
@@ -172,14 +187,15 @@ public class TransactionState {
return result;
}
- public Tag formTransactionalContextTag(int transactionalOp) {
+ public Tag formTransactionalContextTag(int transactionalOp, long ts) {
byte[] tid = Bytes.toBytes (this.transactionId);
byte[] logSeqId = Bytes.toBytes(this.hLogStartSequenceId);
byte[] type = Bytes.toBytes(transactionalOp);
int vers = 1;
byte[] version = Bytes.toBytes(vers);
+ byte[] tsId = Bytes.toBytes(ts);
- byte[] tagBytes = concat(version, type, tid, logSeqId);
+ byte[] tagBytes = concat(version, type, tid, logSeqId, tsId);
byte tagType = TS_TRAFODION_TXN_TAG_TYPE;
Tag tag = new Tag(tagType, tagBytes);
return tag;
@@ -197,6 +213,18 @@ public class TransactionState {
}
}
}
+
+ // Same as updateLatestTimestamp except there is no test for isLatestTimestamp()
+ public static void unconditionalUpdateLatestTimestamp(final Collection<List<Cell>> kvsCollection, final long time) {
+ byte[] timeBytes = Bytes.toBytes(time);
+ // HAVE to manually set the KV timestamps
+ for (List<Cell> kvs : kvsCollection) {
+ for (Cell cell : kvs) {
+ KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
+ kv.updateLatestStamp(timeBytes);
+ }
+ }
+ }
/**
* Returns a boolean indicating whether or not this is a region transaction.
*
@@ -214,7 +242,7 @@ public class TransactionState {
*/
public long getNodeId() {
- return ((transactionId >> 32) & 0xFFL);
+ return nodeId;
}
/**
@@ -226,6 +254,15 @@ public class TransactionState {
return ((transId >> 32) & 0xFFL);
}
+
+ /**
+ * Set the originating node of the transaction.
+ *
+ */
+ private void setNodeId() {
+ nodeId = ((transactionId >> 32) & 0xFFL);
+ }
+
/**
* Get the originating cluster of the passed in transaction.
*
@@ -233,9 +270,27 @@ public class TransactionState {
*/
public static long getClusterId(long transId) {
- return ((transId >> 48) & 0xFFL);
+ return (transId >> 48);
}
+ /**
+ * Get the originating cluster of the passed in transaction.
+ *
+ * @return Return the clusterId.
+ */
+ public long getClusterId() {
+
+ return clusterId;
+ }
+
+ /**
+ * Set the originating clusterId of the passed in transaction.
+ *
+ */
+ private void setClusterId() {
+
+ clusterId = (transactionId >> 48);
+ }
/**
* Get the status.
@@ -250,6 +305,14 @@ public class TransactionState {
return logSeqId.get();
}
+ public void setNeverReadOnly(boolean value) {
+ neverReadOnly = value;
+ }
+
+ public boolean getNeverReadOnly() {
+ return neverReadOnly;
+ }
+
public void setSplitRetry(boolean value) {
splitRetry = value;
}
@@ -278,6 +341,33 @@ public class TransactionState {
return xaOperation;
}
+ public void setStartId(long startId)
+ {
+ startId_ = startId;
+ }
+
+ public long getStartId()
+ {
+ return startId_;
+ }
+
+ /**
+ * Get the commitId for this transaction.
+ *
+ * @return Return the commitSequenceId.
+ */
+ public synchronized long getCommitId() {
+ return commitSequenceId;
+ }
+
+ /**
+ * Set the commitId for this transaction.
+ *
+ */
+ public synchronized void setCommitId(final long Id) {
+ this.commitSequenceId = Id;
+ }
+
/**
* Get the CP epoch at Prepare.
*
@@ -381,7 +471,7 @@ public class TransactionState {
@Override
public String toString() {
return "transactionId: " + transactionId + ", regionTX: " + getIsRegionTx()
- + ", status: " + status + ", regionInfo: " + regionInfo;
+ + ", status: " + status + ", neverReadOnly " + neverReadOnly + ", regionInfo: " + regionInfo;
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java
index c796c2c..b4096f0 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionScannerHolder.java
@@ -25,7 +25,7 @@ package org.apache.hadoop.hbase.regionserver.transactional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
/**
@@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
*/
public class TransactionalRegionScannerHolder {
public RegionScanner s;
- public HRegion r;
+ public Region r;
public long nextCallSeq;
public long numberOfRows;
public long rowsRemaining;
@@ -41,10 +41,11 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
public long scannerId;
public boolean hasMore;
+
public TransactionalRegionScannerHolder(long transId,
long scannerId,
RegionScanner s,
- HRegion r) {
+ Region r) {
this.transId = transId;
this.scannerId = scannerId;
this.s = s;
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl
index 6ec061d..131d1ee 100644
--- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/regionserver/transactional/TrxTransactionState.java.tmpl
@@ -76,12 +76,18 @@ import org.apache.hadoop.hbase.regionserver.ScannerContext;
#endif
#ifdef CDH5.7 APACHE1.2
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.conf.Configuration;
#endif
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.io.DataInputBuffer;
+#ifdef CDH5.7 APACHE1.2
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+#endif
/**
* Holds the state of a transaction. This includes a buffer of all writes, a record of all reads / scans, and
@@ -90,29 +96,35 @@ import org.apache.hadoop.io.DataInputBuffer;
public class TrxTransactionState extends TransactionState {
static java.lang.reflect.Constructor c1_0 = null;
+#ifdef CDH5.7 APACHE1.2
+ static Configuration config = HBaseConfiguration.create();
+#endif
static {
String version = VersionInfo.getVersion();// the hbase version string, eg. "0.6.3-dev"
LOG.info("Got info of Class ScanQueryMatcher for HBase version :" + version);
- try {
- NavigableSet<byte[]> lv_nvg = (NavigableSet<byte[]>) null;
- c1_0 = ScanQueryMatcher.class.getConstructor(new Class[] { Scan.class,
- ScanInfo.class,
- java.util.NavigableSet.class,
- ScanType.class,
- long.class,
- long.class,
- long.class,
- long.class,
- RegionCoprocessorHost.class });
- if (c1_0 != null)
- LOG.info("Got info of Class ScanQueryMatcher for HBase 1.0");
-
- } catch (NoSuchMethodException exc_nsm) {
- LOG.info("TrxRegionEndpoint coprocessor, No matching ScanQueryMatcher : Threw an exception");
- }
- }
-
+ try {
+ NavigableSet<byte[]> lv_nvg = (NavigableSet<byte[]>) null;
+ c1_0 = ScanQueryMatcher.class.getConstructor(
+ new Class [] {
+ Scan.class,
+ ScanInfo.class,
+ java.util.NavigableSet.class,
+ ScanType.class,
+ long.class,
+ long.class,
+ long.class,
+ long.class,
+ RegionCoprocessorHost.class
+ });
+ if (c1_0 != null)
+ LOG.info("Got info of Class ScanQueryMatcher for HBase 1.0");
+
+ }
+ catch (NoSuchMethodException exc_nsm) {
+ LOG.info("TrxRegionEndpoint coprocessor, No matching ScanQueryMatcher : Threw an exception");
+ }
+ }
/**
* Simple container of the range of the scanners we've opened. Used to check for conflicting writes.
*/
@@ -155,9 +167,19 @@ public class TrxTransactionState extends TransactionState {
private Set<TrxTransactionState> transactionsToCheck = Collections.synchronizedSet(new HashSet<TrxTransactionState>());
private WALEdit e;
private boolean dropTableRecorded;
+ private HRegion h_Region = null;
+ public long prepareEditSize = 0;
+ public long endEditSize = 0;
public TrxTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId,
- final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging) {
+ final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog,
+#ifdef CDH5.7 APACHE1.2
+ boolean logging, HRegion hRegion,
+#else
+ boolean logging,
+#endif
+ final long startId) {
+
super(transactionId,
rLogStartSequenceId,
hlogSeqId,
@@ -165,16 +187,29 @@ public class TrxTransactionState extends TransactionState {
htd,
hLog,
logging,
+ startId,
false); // not a region transaction
this.e = new WALEdit();
dropTableRecorded = false;
+#ifdef CDH5.7 APACHE1.2
+ this.h_Region = hRegion;
+#endif
}
public TrxTransactionState(final long transactionId, final long rLogStartSequenceId, AtomicLong hlogSeqId,
- final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog, boolean logging, boolean regionTx) {
- super(transactionId, rLogStartSequenceId, hlogSeqId, regionInfo, htd, hLog, logging, regionTx);
+ final HRegionInfo regionInfo, HTableDescriptor htd, WAL hLog,
+#ifdef CDH5.7 APACHE1.2
+ boolean logging, HRegion hRegion,
+#else
+ boolean logging,
+#endif
+ final long startId, boolean regionTx) {
+ super(transactionId, rLogStartSequenceId, hlogSeqId, regionInfo, htd, hLog, logging, startId, regionTx);
this.e = new WALEdit();
dropTableRecorded = false;
+#ifdef CDH5.7 APACHE1.2
+ this.h_Region = hRegion;
+#endif
}
public void setDropTableRecorded(boolean dropTableRecord) {
@@ -195,53 +230,97 @@ public class TrxTransactionState extends TransactionState {
scans.add(new ScanRange(rowKey, rowKey));
}
- public synchronized void addWrite(final Put write) {
- if (LOG.isTraceEnabled())
- LOG.trace("addWrite -- ENTRY: write: " + write.toString());
+ public synchronized void addWrite(final Put put, final boolean useStartId) {
+ if (!this.getStatus().equals(Status.PENDING)) { // Active
+ LOG.error("addWrite late checkin for transaction " + this);
+ }
+
+#ifdef CDH5.7 APACHE1.2
+ MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
+ long mvccNum = 0;
+#endif
+ if (LOG.isTraceEnabled()) LOG.trace("addWrite -- ENTRY: useStartId: " + useStartId + " put: " + put.toString());
WriteAction waction;
KeyValue kv;
WALEdit e1 = new WALEdit();
- updateLatestTimestamp(write.getFamilyCellMap().values(), EnvironmentEdgeManager.currentTime());
+ // If useStartId is true we use the startId as the timestamp,
+ // otherwise we use the current timestamp
+ long now;
+ if (useStartId == true) {
+ now = getStartId();
+ unconditionalUpdateLatestTimestamp(put.getFamilyCellMap().values(), now);
+ }
+ else {
+ now = EnvironmentEdgeManager.currentTime();
+ updateLatestTimestamp(put.getFamilyCellMap().values(), now);
+ }
// Adding read scan on a write action
- addRead(new WriteAction(write).getRow());
+ addRead(new WriteAction(put).getRow());
ListIterator<WriteAction> writeOrderIter = writeOrdering.listIterator(writeOrdering.size());
- writeOrderIter.add(waction = new WriteAction(write));
+ writeOrderIter.add(waction = new WriteAction(put));
if (this.earlyLogging) { // immediately write edit out to HLOG during DML (active transaction state)
- for (Cell value : waction.getCells()) {
- // KeyValue kv = KeyValueUtil.ensureKeyValue(value);
- kv = KeyValue.cloneAndAddTags(value, tagList);
- // if (LOG.isTraceEnabled()) LOG.trace("KV hex dump " + Hex.encodeHexString(kv.getValueArray()
- // /*kv.getBuffer()*/));
- e1.add(kv);
- e.add(kv);
+ for (Cell value : waction.getCells()) {
+ //KeyValue kv = KeyValueUtil.ensureKeyValue(value);
+ kv = KeyValue.cloneAndAddTags(value, tagList);
+ if (LOG.isTraceEnabled()) LOG.trace("addWrite kv hex dump " + Hex.encodeHexString(kv.getValueArray() /*kv.getBuffer()*/));
+ e1.add(kv);
+ e.add(kv);
}
- try {
- // long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
- // e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
- // this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
- final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), this.regionInfo.getTable(),
- EnvironmentEdgeManager.currentTime());
+ try {
+ //long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
+ // e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
+ // this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
+
#ifdef CDH5.7 APACHE1.2
- long txid = this.tHLog.append(this.tabledescriptor, this.regionInfo, wk, e1, false);
+ WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
+ this.regionInfo.getTable(),
+ WALKey.NO_SEQUENCE_ID,
+ EnvironmentEdgeManager.currentTime(),
+ WALKey.EMPTY_UUIDS,
+ HConstants.NO_NONCE,
+ HConstants.NO_NONCE,
+ this.h_Region.getMVCC());
+ long txid = this.tHLog.append(this.tabledescriptor,this.regionInfo, wk, e1, false);
+
+ writeEntry = wk.getWriteEntry();
+ mvccNum = writeEntry.getWriteNumber();
+
+ if (writeEntry != null) {
+ this.h_Region.getMVCC().completeAndWait(writeEntry);
+ writeEntry = null;
+ }
+
#else
- long txid = this.tHLog.append(this.tabledescriptor, this.regionInfo, wk, e1, this.logSeqId, false, null);
+ final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), this.regionInfo.getTable(), now);
+ long txid = this.tHLog.append(this.tabledescriptor,this.regionInfo, wk , e1, this.logSeqId, false, null);
#endif
- // if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid
- // " + txid + " ts flush id " + this.flushTxId);
- if (txid > this.flushTxId)
- this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase
- // 1
- } catch (IOException exp1) {
- LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception");
- // throw exp1;
- }
- } else { // edits are buffered in ts and written out to HLOG in phase 1
- for (Cell value : waction.getCells()) {
- kv = KeyValue.cloneAndAddTags(value, tagList);
- e.add(kv);
- }
+ //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y11 write edit to HLOG during put with txid " + txid + " ts flush id " + this.flushTxId);
+ if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1
+ }
+ catch (IOException exp1) {
+ LOG.info("TrxRegionEndpoint coprocessor addWrite writing to HLOG for early logging: Threw an exception");
+ //throw exp1;
+ }
+#ifdef CDH5.7 APACHE1.2
+ finally {
+ if (writeEntry != null) {
+ this.h_Region.getMVCC().completeAndWait(writeEntry);
+ writeEntry = null;
+ }
+ } // finally
+#endif
+ }
+ else { // edits are buffered in ts and written out to HLOG in phase 1
+ for (Cell value : waction.getCells()) {
+ kv = KeyValue.cloneAndAddTags(value, tagList);
+ if (LOG.isTraceEnabled()) LOG.trace("addWrite kv hex dump "
+ + " key: " + Hex.encodeHexString(Bytes.toBytes(kv.getKeyString()))
+ + " timestamp: " + kv.getTimestamp()
+ + Hex.encodeHexString(kv.getValueArray() /*kv.getBuffer()*/));
+ e.add(kv);
+ }
}
if (LOG.isTraceEnabled())
LOG.trace("addWrite -- EXIT");
@@ -255,16 +334,32 @@ public class TrxTransactionState extends TransactionState {
return writeOrdering.size();
}
- public synchronized void addDelete(final Delete delete) {
- if (LOG.isTraceEnabled())
- LOG.trace("addDelete -- ENTRY: delete: " + delete.toString());
+ public synchronized void addDelete(final Delete delete, final boolean useStartId) {
+ if (!this.getStatus().equals(Status.PENDING)) { // Active
+ LOG.error("addDelete late checkin for transaction " + this);
+ }
+#ifdef CDH5.7 APACHE1.2
+ MultiVersionConcurrencyControl.WriteEntry writeEntry = null;
+ long mvccNum = 0;
+#endif
+ if (LOG.isTraceEnabled()) LOG.trace("addDelete -- ENTRY: useStartId: " + useStartId + " delete: " + delete.toString());
WriteAction waction;
- WALEdit e1 = new WALEdit();
- long now = EnvironmentEdgeManager.currentTime();
- updateLatestTimestamp(delete.getFamilyCellMap().values(), now);
- if (delete.getTimeStamp() == HConstants.LATEST_TIMESTAMP) {
- delete.setTimestamp(now);
+ KeyValue kv;
+ WALEdit e1 = new WALEdit();
+ // If useStartId is true we use the startId as the timestamp,
+ // otherwise we use the current timestamp
+ long now;
+ if (useStartId == true) {
+ now = getStartId();
+ unconditionalUpdateLatestTimestamp(delete.getFamilyCellMap().values(), now);
+ }
+ else {
+ now = EnvironmentEdgeManager.currentTime();
+ updateLatestTimestamp(delete.getFamilyCellMap().values(), now);
+ if (delete.getTimeStamp() == HConstants.LATEST_TIMESTAMP) {
+ delete.setTimestamp(now);
+ }
}
deletes.add(delete);
@@ -272,40 +367,63 @@ public class TrxTransactionState extends TransactionState {
writeOrderIter.add(waction = new WriteAction(delete));
if (this.earlyLogging) {
- for (Cell value : waction.getCells()) {
- KeyValue kv = KeyValue.cloneAndAddTags(value, tagList);
- e1.add(kv);
- e.add(kv);
- }
- try {
- // long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
- // e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
- // this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
- final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), this.regionInfo.getTable(),
- EnvironmentEdgeManager.currentTime());
+ for (Cell value : waction.getCells()) {
+ kv = KeyValue.cloneAndAddTags(value, tagList);
+ e1.add(kv);
+ e.add(kv);
+ }
+ try {
+ //long txid = this.tHLog.appendNoSync(this.regionInfo, this.regionInfo.getTable(),
+ // e1, new ArrayList<UUID>(), EnvironmentEdgeManager.currentTimeMillis(), this.tabledescriptor,
+ // this.logSeqId, false, HConstants.NO_NONCE, HConstants.NO_NONCE);
#ifdef CDH5.7 APACHE1.2
- long txid = this.tHLog.append(this.tabledescriptor, this.regionInfo, wk, e1, false);
+ WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(),
+ this.regionInfo.getTable(),
+ WALKey.NO_SEQUENCE_ID,
+ EnvironmentEdgeManager.currentTime(),
+ WALKey.EMPTY_UUIDS,
+ HConstants.NO_NONCE,
+ HConstants.NO_NONCE,
+ this.h_Region.getMVCC());
+ long txid = this.tHLog.append(this.tabledescriptor,this.regionInfo, wk, e1, false);
+
+ writeEntry = wk.getWriteEntry();
+ mvccNum = writeEntry.getWriteNumber();
+
+ if (writeEntry != null) {
+ this.h_Region.getMVCC().completeAndWait(writeEntry);
+ writeEntry = null;
+ }
+
#else
- long txid = this.tHLog.append(this.tabledescriptor, this.regionInfo, wk, e1, this.logSeqId, false, null);
+ final WALKey wk = new WALKey(this.regionInfo.getEncodedNameAsBytes(), this.regionInfo.getTable(), now);
+ long txid = this.tHLog.append(this.tabledescriptor,this.regionInfo, wk , e1, this.logSeqId, false, null);
#endif
- // if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y00 write edit to HLOG during delete with
- // txid " + txid + " ts flush id " + this.flushTxId);
- if (txid > this.flushTxId)
- this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase
- // 1
- } catch (IOException exp1) {
- LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception");
- }
- } else {
- for (Cell value : waction.getCells()) {
- KeyValue kv = KeyValue.cloneAndAddTags(value, tagList);
- e.add(kv);
- }
- }
- if (LOG.isTraceEnabled())
- LOG.trace("addDelete -- EXIT");
+ //if (LOG.isTraceEnabled()) LOG.trace("Trafodion Recovery: Y00 write edit to HLOG during delete with txid " + txid + " ts flush id " + this.flushTxId);
+ if (txid > this.flushTxId) this.flushTxId = txid; // save the log txid into TS object, later sync on largestSeqid during phase 1
+ }
+ catch (IOException exp1) {
+ LOG.info("TrxRegionEndpoint coprocessor addDelete writing to HLOG for early logging: Threw an exception");
+ }
+#ifdef CDH5.7 APACHE1.2
+ finally {
+ if (writeEntry != null) {
+ this.h_Region.getMVCC().completeAndWait(writeEntry);
+ writeEntry = null;
+ }
+ } // finally
+#endif
+ }
+ else {
+ for (Cell value : waction.getCells()) {
+ kv = KeyValue.cloneAndAddTags(value, tagList);
+ e.add(kv);
+ } // all cells
+ } // not early logging
+
+ if (LOG.isTraceEnabled()) LOG.trace("addDelete -- EXIT");
}
public synchronized void applyDeletes(final List<Cell> input, final long minTime, final long maxTime) {
@@ -512,12 +630,20 @@ public class TrxTransactionState extends TransactionState {
result.append(getIsRegionTx());
result.append(" status: ");
result.append(status.name());
+ result.append(" neverReadOnly: ");
+ result.append(getNeverReadOnly());
result.append(" scan Size: ");
result.append(scans.size());
result.append(" write Size: ");
result.append(getWriteOrdering().size());
result.append(" startSQ: ");
result.append(startSequenceNumber);
+ result.append(" prepareEditSize: ");
+ result.append(prepareEditSize);
+ result.append(" endEditSize: ");
+ result.append(endEditSize);
+ result.append(" editSize: ");
+ result.append(getEdit().getCells().size());
if (sequenceNumber != null) {
result.append(" commitedSQ:");
result.append(sequenceNumber);
@@ -541,9 +667,9 @@ public class TrxTransactionState extends TransactionState {
*
* @return deletes
*/
- public synchronized List<Delete> getDeletes() {
- return deletes;
- }
+ //public synchronized List<Delete> getDeletes() {
+ // return deletes;
+ //}
/**
* Get a scanner to go through the puts and deletes from this transaction. Used to weave together the local trx puts
@@ -601,7 +727,8 @@ public class TrxTransactionState extends TransactionState {
@SuppressWarnings("deprecation")
private synchronized KeyValue[] getAllKVs(final Scan scan) {
- // if (LOG.isTraceEnabled()) LOG.trace("getAllKVs -- ENTRY");
+ if (LOG.isTraceEnabled()) LOG.trace("getAllKVs -- ENTRY for transId " + this.getTransactionId()
+ + " with writeOrdering size " + this.getWriteOrdering().size());
List<Cell> kvList = new ArrayList<Cell>();
ListIterator<WriteAction> writeOrderIter = null;
@@ -694,7 +821,7 @@ public class TrxTransactionState extends TransactionState {
//Store.ScanInfo scaninfo = new Store.ScanInfo(null, 0, 1, HConstants.FOREVER, false, 0, Cell.COMPARATOR);
#ifdef CDH5.7 APACHE1.2
- ScanInfo scaninfo = new ScanInfo(HBaseConfiguration.create(), null, 0, 1, HConstants.FOREVER,KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
+ ScanInfo scaninfo = new ScanInfo(getConfig(), null, 0, 1, HConstants.FOREVER,KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
#else
ScanInfo scaninfo = new ScanInfo(null, 0, 1, HConstants.FOREVER,KeepDeletedCells.FALSE, 0, KeyValue.COMPARATOR);
#endif
@@ -1003,12 +1130,11 @@ public class TrxTransactionState extends TransactionState {
}
/**
- * before adding a put to the writeOrdering list, check whether we have deleted a row with the same key. If true
- * remove the delete before adding the put
+ * before adding a put to the writeOrdering list, check whether we have deleted a row with the
+ * same key. If true remove the delete before adding the put
*/
public void removeDelBeforePut(Put put) {
- if (LOG.isTraceEnabled())
- LOG.trace("removeDelBeforePut put : " + put);
+ if (LOG.isTraceEnabled()) LOG.trace("removeDelBeforePut put : " + put);
byte[] putRow = put.getRow();
KeyValue kv;
@@ -1032,4 +1158,9 @@ public class TrxTransactionState extends TransactionState {
}
}
}
+#ifdef CDH5.7 APACHE1.2
+ public Configuration getConfig() {
+ return config;
+ }
+#endif
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
index a0a1385..a69e6b8 100755
--- a/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
+++ b/core/sqf/src/seatrans/hbase-trx/src/main/protobuf/TrxRegion.proto
@@ -36,6 +36,7 @@ message AbortTransactionRequest{
required int64 transactionId = 2;
required int32 participantNum = 3;
required bool dropTableRecorded = 4;
+ optional bool ignoreUnknownTransactionException = 5;
}
message AbortTransactionResponse {
@@ -56,7 +57,8 @@ message AbortTransactionMultipleResponse {
message BeginTransactionRequest{
required int64 transactionId = 1;
- required bytes regionName = 2;
+ required int64 startId = 2;
+ required bytes regionName = 3;
}
message BeginTransactionResponse {
@@ -67,8 +69,9 @@ message BeginTransactionResponse {
message CommitRequest{
required bytes regionName = 1;
required int64 transactionId = 2;
- required int32 participantNum = 3;
- optional bool ignoreUnknownTransactionException = 4;
+ required int64 commitId = 3;
+ required int32 participantNum = 4;
+ optional bool ignoreUnknownTransactionException = 5;
}
message CommitResponse {
@@ -79,8 +82,9 @@ message CommitResponse {
message CommitMultipleRequest{
repeated bytes regionName = 1;
required int64 transactionId = 2;
- required int32 participantNum = 3;
- optional bool ignoreUnknownTransactionException = 4;
+ required int64 commitId = 3;
+ required int32 participantNum = 4;
+ optional bool ignoreUnknownTransactionException = 5;
}
message CommitMultipleResponse {
@@ -130,12 +134,13 @@ message CommitIfPossibleResponse {
message CheckAndDeleteRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- required bytes row = 3;
- required bytes family = 4;
- required bytes qualifier = 5;
- required bytes value = 6;
- required MutationProto delete = 7;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ required bytes row = 4;
+ required bytes family = 5;
+ required bytes qualifier = 6;
+ required bytes value = 7;
+ required MutationProto delete = 8;
}
message CheckAndDeleteResponse {
@@ -146,13 +151,14 @@ message CheckAndDeleteResponse {
message CheckAndDeleteRegionTxRequest {
required int64 tid = 1;
- required bytes regionName = 2;
- required bytes row = 3;
- required bytes family = 4;
- required bytes qualifier = 5;
- required bytes value = 6;
- required MutationProto delete = 7;
- required bool autoCommit = 8;
+ required int64 commitId = 2;
+ required bytes regionName = 3;
+ required bytes row = 4;
+ required bytes family = 5;
+ required bytes qualifier = 6;
+ required bytes value = 7;
+ required MutationProto delete = 8;
+ required bool autoCommit = 9;
}
message CheckAndDeleteRegionTxResponse {
@@ -163,12 +169,13 @@ message CheckAndDeleteRegionTxResponse {
message CheckAndPutRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- required bytes row = 3;
- required bytes family = 4;
- required bytes qualifier = 5;
- required bytes value = 6;
- required MutationProto put = 7;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ required bytes row = 4;
+ required bytes family = 5;
+ required bytes qualifier = 6;
+ required bytes value = 7;
+ required MutationProto put = 8;
}
message CheckAndPutResponse {
@@ -179,13 +186,14 @@ message CheckAndPutResponse {
message CheckAndPutRegionTxRequest {
required int64 tid = 1;
- required bytes regionName = 2;
- required bytes row = 3;
- required bytes family = 4;
- required bytes qualifier = 5;
- required bytes value = 6;
- required MutationProto put = 7;
- required bool autoCommit = 8;
+ required int64 commitId = 2;
+ required bytes regionName = 3;
+ required bytes row = 4;
+ required bytes family = 5;
+ required bytes qualifier = 6;
+ required bytes value = 7;
+ required MutationProto put = 8;
+ required bool autoCommit = 9;
}
message CheckAndPutRegionTxResponse {
@@ -207,8 +215,9 @@ message CloseScannerResponse {
message DeleteMultipleTransactionalRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- repeated MutationProto delete = 3;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ repeated MutationProto delete = 4;
}
message DeleteMultipleTransactionalResponse {
@@ -219,9 +228,10 @@ message DeleteMultipleTransactionalResponse {
message DeleteRegionTxRequest {
required int64 tid = 1;
- required bytes regionName = 2;
- required MutationProto delete = 3;
- required bool autoCommit = 4;
+ required int64 commitId = 2;
+ required bytes regionName = 3;
+ required MutationProto delete = 4;
+ required bool autoCommit = 5;
}
message DeleteRegionTxResponse {
@@ -232,8 +242,9 @@ message DeleteRegionTxResponse {
message DeleteTransactionalRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- required MutationProto delete = 3;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ required MutationProto delete = 4;
}
message DeleteTransactionalResponse {
@@ -244,8 +255,9 @@ message DeleteTransactionalResponse {
message GetTransactionalRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- required Get get = 3;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ required Get get = 4;
}
message GetTransactionalResponse {
@@ -256,8 +268,9 @@ message GetTransactionalResponse {
message OpenScannerRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- required Scan scan = 3;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ required Scan scan = 4;
}
message OpenScannerResponse {
@@ -268,11 +281,12 @@ message OpenScannerResponse {
message PerformScanRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- required int64 scannerId = 3;
- required int32 numberOfRows = 4;
- required bool closeScanner = 5;
- required int64 nextCallSeq = 6;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ required int64 scannerId = 4;
+ required int32 numberOfRows = 5;
+ required bool closeScanner = 6;
+ required int64 nextCallSeq = 7;
}
message PerformScanResponse {
@@ -286,9 +300,10 @@ message PerformScanResponse {
message PutRegionTxRequest {
required int64 tid = 1;
- required bytes regionName = 2;
- required MutationProto put = 3;
- required bool autoCommit = 4;
+ required int64 commitId = 2;
+ required bytes regionName = 3;
+ required MutationProto put = 4;
+ required bool autoCommit = 5;
}
message PutRegionTxResponse {
@@ -299,8 +314,9 @@ message PutRegionTxResponse {
message PutTransactionalRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- required MutationProto put = 3;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ required MutationProto put = 4;
}
message PutTransactionalResponse {
@@ -311,8 +327,9 @@ message PutTransactionalResponse {
message PutMultipleTransactionalRequest {
required int64 transactionId = 1;
- required bytes regionName = 2;
- repeated MutationProto put = 3;
+ required int64 startId = 2;
+ required bytes regionName = 3;
+ repeated MutationProto put = 4;
}
message PutMultipleTransactionalResponse {
@@ -346,55 +363,45 @@ message RecoveryRequestResponse {
message TlogDeleteRequest{
required bytes regionName = 1;
- required int64 transactionId = 2;
- required Scan scan = 3;
- required int64 auditSeqNum = 4;
- required bool ageCommitted = 5;
+ required Scan scan = 2;
+ required int64 auditSeqNum = 3;
+ required bool ageCommitted = 4;
}
message TlogDeleteResponse {
repeated Result result = 1;
required int64 count = 2;
- optional string exception = 3;
- optional bool hasException = 4;
+ optional bool hasException = 3;
+ optional string exception = 4;
}
message TlogWriteRequest{
required bytes regionName = 1;
required int64 transactionId = 2;
required MutationProto put = 3;
- required bytes row = 4;
- required bytes family = 5;
- required bytes qualifier = 6;
- required bytes value = 7;
- required int64 commitId = 8;
- optional bool forced = 9;
+ required bytes family = 4;
+ required bytes qualifier = 5;
+ required int64 commitId = 6;
+ optional bool forced = 7;
}
message TlogWriteResponse {
repeated int64 result = 1;
- optional string exception = 2;
- optional bool hasException = 3;
+ optional bool hasException = 2;
+ optional string exception = 3;
}
message TlogTransactionStatesFromIntervalRequest{
- required bytes regionName = 1;
- required int64 transactionId = 2;
- required int64 clusterId = 3;
- required int64 auditSeqNum =4;
- required int64 scannerId = 5;
- required int32 numberOfRows = 6;
- required int64 nextCallSeq = 7;
- required bool closeScanner = 8;
+ required int64 clusterId = 1;
+ required int64 auditSeqNum =2;
+ required Scan scan = 3;
}
message TlogTransactionStatesFromIntervalResponse {
repeated Result result = 1;
required int64 count = 2;
- required int64 nextCallSeq = 3;
- required bool hasMore = 4;
- optional string exception = 5;
- optional bool hasException = 6;
+ optional string exception = 3;
+ optional bool hasException = 4;
}
message TransactionalAggregateRequest {
@@ -406,9 +413,11 @@ message TransactionalAggregateRequest {
*/
required bytes regionName = 1;
required int64 transactionId = 2;
- required string interpreter_class_name = 3;
- required Scan scan = 4;
- optional bytes interpreter_specific_bytes = 5;
+ required int64 startId = 3;
+ required string interpreter_class_name = 4;
+ required Scan scan = 5;
+ optional bytes interpreter_specific_bytes = 6;
+
}
message TransactionalAggregateResponse {
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/91794b53/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
----------------------------------------------------------------------
diff --git a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
index ba81304..a595d42 100644
--- a/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
+++ b/core/sqf/src/seatrans/tm/hbasetmlib2/src/main/java/org/trafodion/dtm/HBaseAuditControlPoint.java
@@ -83,10 +83,14 @@ public class HBaseAuditControlPoint {
private Configuration config;
private static String CONTROL_POINT_TABLE_NAME;
private static final byte[] CONTROL_POINT_FAMILY = Bytes.toBytes("cpf");
- private static final byte[] ASN_HIGH_WATER_MARK = Bytes.toBytes("hwm");
- private static HTable table;
+ private static final byte[] CP_NUM_AND_ASN_HWM = Bytes.toBytes("hwm");
+ private HTable table;
private boolean useAutoFlush;
private boolean disableBlockCache;
+ private static final int versions = 10;
+ private static int myClusterId = 0;
+ private int TlogRetryDelay;
+ private int TlogRetryCount;
public HBaseAuditControlPoint(Configuration config, Connection connection) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Enter HBaseAuditControlPoint constructor()");
@@ -96,17 +100,46 @@ public class HBaseAuditControlPoint {
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(CONTROL_POINT_TABLE_NAME));
HColumnDescriptor hcol = new HColumnDescriptor(CONTROL_POINT_FAMILY);
+ TlogRetryDelay = 3000; // 3 seconds
+ try {
+ String retryDelayS = System.getenv("TM_TLOG_RETRY_DELAY");
+ if (retryDelayS != null){
+ TlogRetryDelay = (Integer.parseInt(retryDelayS) > TlogRetryDelay ? Integer.parseInt(retryDelayS) : TlogRetryDelay);
+ }
+ }
+ catch (NumberFormatException e) {
+ if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_RETRY_DELAY is not valid in ms.env");
+ }
+
+ TlogRetryCount = 40;
+ try {
+ String retryCountS = System.getenv("TM_TLOG_RETRY_COUNT");
+ if (retryCountS != null){
+ TlogRetryCount = (Integer.parseInt(retryCountS) > TlogRetryCount ? Integer.parseInt(retryCountS) : TlogRetryCount);
+ }
+ }
+ catch (NumberFormatException e) {
+ if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_RETRY_COUNT is not valid in ms.env");
+ }
+
disableBlockCache = false;
- String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
- if (blockCacheString != null){
- disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
+ try {
+ String blockCacheString = System.getenv("TM_TLOG_DISABLE_BLOCK_CACHE");
+ if (blockCacheString != null){
+ disableBlockCache = (Integer.parseInt(blockCacheString) != 0);
if (LOG.isDebugEnabled()) LOG.debug("disableBlockCache != null");
+ }
+ }
+ catch (NumberFormatException e) {
+ if (LOG.isDebugEnabled()) LOG.debug("TM_TLOG_DISABLE_BLOCK_CACHE is not valid in ms.env");
}
LOG.info("disableBlockCache is " + disableBlockCache);
if (disableBlockCache) {
hcol.setBlockCacheEnabled(false);
}
+ hcol.setMaxVersions(versions);
+
desc.addFamily(hcol);
useAutoFlush = true;
@@ -120,7 +153,6 @@ public class HBaseAuditControlPoint {
boolean lvControlPointExists = admin.tableExists(TableName.valueOf(CONTROL_POINT_TABLE_NAME));
if (LOG.isDebugEnabled()) LOG.debug("HBaseAuditControlPoint lvControlPointExists " + lvControlPointExists);
currControlPt = -1;
- admin.close();
if (lvControlPointExists == false) {
try {
if (LOG.isDebugEnabled()) LOG.debug("Creating the table " + CONTROL_POINT_TABLE_NAME);
@@ -128,7 +160,11 @@ public class HBaseAuditControlPoint {
currControlPt = 1;
}
catch (TableExistsException e) {
- LOG.error("Table " + CONTROL_POINT_TABLE_NAME + " already exists");
+ LOG.error("Table " + CONTROL_POINT_TABLE_NAME + " already exists", e);
+ throw new IOException(e);
+ }
+ finally{
+ admin.close();
}
}
if (LOG.isDebugEnabled()) LOG.debug("try new HTable");
@@ -136,7 +172,8 @@ public class HBaseAuditControlPoint {
table.setAutoFlushTo(this.useAutoFlush);
if (currControlPt == -1){
- currControlPt = getCurrControlPt();
+ if (LOG.isDebugEnabled()) LOG.debug("getting currControlPt for clusterId " + myClusterId);
+ currControlPt = getCurrControlPt(myClusterId);
}
if (LOG.isDebugEnabled()) LOG.debug("currControlPt is " + currControlPt);
@@ -144,167 +181,198 @@ public class HBaseAuditControlPoint {
return;
}
- public long getCurrControlPt() throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("getCurrControlPt: start");
- long highKey = -1;
- if (LOG.isDebugEnabled()) LOG.debug("new Scan");
- Scan s = new Scan();
- s.setCaching(10);
- s.setCacheBlocks(false);
- if (LOG.isDebugEnabled()) LOG.debug("resultScanner");
- ResultScanner ss = table.getScanner(s);
- try {
- long currKey;
- String rowKey;
- if (LOG.isDebugEnabled()) LOG.debug("entering for loop" );
- for (Result r : ss) {
- rowKey = new String(r.getRow());
- if (LOG.isDebugEnabled()) LOG.debug("rowKey is " + rowKey );
- currKey = Long.parseLong(rowKey);
- if (LOG.isDebugEnabled()) LOG.debug("value is " + Long.parseLong(Bytes.toString(r.value())));
- if (currKey > highKey) {
- if (LOG.isDebugEnabled()) LOG.debug("Setting highKey to " + currKey);
- highKey = currKey;
+ public long getCurrControlPt(final int clusterId) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("getCurrControlPt: start, clusterId " + clusterId);
+ long lvCpNum = 1;
+
+ Get g = new Get(Bytes.toBytes(clusterId));
+ if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt attempting table.get");
+ Result r = table.get(g);
+ if (r.isEmpty())
+ return lvCpNum;
+ if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt Result: " + r);
+ String value = new String(Bytes.toString(r.getValue(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM)));
+ // In theory the above value is the latestversion of the column
+ if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt for clusterId: " + clusterId + ", valueString is " + value);
+ StringTokenizer stok = new StringTokenizer(value, ",");
+ if (stok.hasMoreElements()) {
+ if (LOG.isTraceEnabled()) LOG.trace("Parsing record in getCurrControlPt");
+ String ctrlPtToken = stok.nextElement().toString();
+ lvCpNum = Long.parseLong(ctrlPtToken, 10);
+ if (LOG.isTraceEnabled()) LOG.trace("Value for getCurrControlPt and clusterId: "
+ + clusterId + " is: " + lvCpNum);
+ }
+ return lvCpNum;
+ }
+
+ public long putRecord(final int clusterId, final long ControlPt, final long startingSequenceNumber) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("putRecord clusterId: " + clusterId + ", startingSequenceNumber (" + startingSequenceNumber + ")");
+ Put p = new Put(Bytes.toBytes(clusterId));
+ p.add(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM,
+ Bytes.toBytes(String.valueOf(ControlPt) + ","
+ + String.valueOf(startingSequenceNumber)));
+ boolean complete = false;
+ int retries = 0;
+ do {
+ try {
+ retries++;
+ if (LOG.isTraceEnabled()) LOG.trace("try table.put with cluster Id: " + clusterId + " and startingSequenceNumber " + startingSequenceNumber);
+ table.put(p);
+ if (useAutoFlush == false) {
+ if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record");
+ table.flushCommits();
+ }
+ complete = true;
+ if (retries > 1){
+ if (LOG.isTraceEnabled()) LOG.trace("Retry successful in putRecord for cp: " + ControlPt + " on table "
+ + table.getTableName().toString());
}
}
- } finally {
- ss.close();
- }
- if (LOG.isDebugEnabled()) LOG.debug("getCurrControlPt returning " + highKey);
- return highKey;
- }
-
- public long putRecord(final long ControlPt, final long startingSequenceNumber) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("putRecord starting sequence number (" + String.valueOf(startingSequenceNumber) + ")");
- String controlPtString = new String(String.valueOf(ControlPt));
- Put p = new Put(Bytes.toBytes(controlPtString));
- p.add(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK, Bytes.toBytes(String.valueOf(startingSequenceNumber)));
- if (LOG.isTraceEnabled()) LOG.trace("try table.put with starting sequence number " + startingSequenceNumber);
- table.put(p);
- if (useAutoFlush == false) {
- if (LOG.isTraceEnabled()) LOG.trace("flushing controlpoint record");
- table.flushCommits();
- }
+ catch (IOException e){
+ LOG.error("Retrying putRecord on control point: " + ControlPt + " on control point table "
+ + table.getTableName().toString() + " due to Exception " + e);
+// locator.getRegionLocation(p.getRow(), true);
+ table.getRegionLocation(p.getRow(), true);
+ try {
+ Thread.sleep(TlogRetryDelay); // 3 second default
+ } catch (InterruptedException ie) {
+ }
+ if (retries == TlogRetryCount){
+ LOG.error("putRecord aborting due to excessive retries on on control point table : "
+ + table.getTableName().toString() + " due to Exception; aborting ");
+ System.exit(1);
+ }
+ }
+ } while (! complete && retries < TlogRetryCount); // default give up after 5 minutes
if (LOG.isTraceEnabled()) LOG.trace("HBaseAuditControlPoint:putRecord returning " + ControlPt);
return ControlPt;
}
- public ArrayList<String> getRecordList(String controlPt) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("getRecord");
- ArrayList<String> transactionList = new ArrayList<String>();
- Get g = new Get(Bytes.toBytes(controlPt));
- Result r = table.get(g);
- byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
- String recordString = new String(currValue);
- if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
- StringTokenizer st = new StringTokenizer(recordString, ",");
- while (st.hasMoreElements()) {
- String token = st.nextElement().toString() ;
- if (LOG.isDebugEnabled()) LOG.debug("token is " + token);
- transactionList.add(token);
- }
-
- if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit with list size (" + transactionList.size() + ")");
- return transactionList;
-
- }
+ public long getRecord(final int clusterId, final String controlPt) throws IOException {
- public long getRecord(final String controlPt) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("getRecord " + controlPt);
+ if (LOG.isTraceEnabled()) LOG.trace("getRecord clusterId: " + clusterId + " controlPt: " + controlPt);
long lvValue = -1;
- Get g = new Get(Bytes.toBytes(controlPt));
- String recordString;
- Result r = table.get(g);
- byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
- if (currValue != null)
- {
- recordString = new String (Bytes.toString(currValue));
- if (LOG.isDebugEnabled()) LOG.debug("recordString is " + recordString);
- lvValue = Long.parseLong(recordString, 10);
- }
- if (LOG.isTraceEnabled()) LOG.trace("getRecord - exit " + lvValue);
+ Get g = new Get(Bytes.toBytes(clusterId));
+ g.setMaxVersions(versions); // will return last n versions of row
+ g.addColumn(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);
+ String ctrlPtToken;
+ String asnToken;
+ Result r = table.get(g);
+ if (r.isEmpty())
+ return lvValue;
+ List<Cell> list = r.getColumnCells(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM); // returns all versions of this column
+ for (Cell element : list) {
+ StringTokenizer stok = new StringTokenizer(Bytes.toString(CellUtil.cloneValue(element)), ",");
+ if (stok.hasMoreElements()) {
+ ctrlPtToken = stok.nextElement().toString();
+ if (LOG.isTraceEnabled()) LOG.trace("Parsing record for controlPt (" + ctrlPtToken + ")");
+ asnToken = stok.nextElement().toString();
+ if (Long.parseLong(ctrlPtToken, 10) == Long.parseLong(controlPt, 10)){
+ // This is the one we are looking for
+ lvValue = Long.parseLong(asnToken, 10);
+ if (LOG.isTraceEnabled()) LOG.trace("ASN value for controlPt: " + controlPt + " is: " + lvValue);
+ return lvValue;
+ }
+ }
+ else {
+ if (LOG.isTraceEnabled()) LOG.trace("No tokens to parse for controlPt (" + controlPt + ")");
+ }
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("all results scannned for clusterId: " + clusterId + ", but controlPt: " + controlPt + " not found");
return lvValue;
+ }
- }
-
- public long getStartingAuditSeqNum() throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum");
- String controlPtString = new String(String.valueOf(currControlPt));
- long lvAsn;
- if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum new get for control point " + currControlPt);
- Get g = new Get(Bytes.toBytes(controlPtString));
- if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum setting result");
- Result r = table.get(g);
- if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum currValue CONTROL_POINT_FAMILY is "
- + CONTROL_POINT_FAMILY + " ASN_HIGH_WATER_MARK " + ASN_HIGH_WATER_MARK);
- byte [] currValue = r.getValue(CONTROL_POINT_FAMILY, ASN_HIGH_WATER_MARK);
- if (LOG.isDebugEnabled()) LOG.debug("Starting asn setting recordString ");
- if (currValue == null)
- lvAsn = 1;
- else
- {
- String recordString = new String(currValue);
- lvAsn = Long.valueOf(recordString);
- }
+ public long getStartingAuditSeqNum(final int clusterId) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum for clusterId: " + clusterId);
+ long lvAsn = 1;
+
+ Get g = new Get(Bytes.toBytes(clusterId));
+ String asnToken;
+ if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum attempting table.get");
+ Result r = table.get(g);
+ if (r.isEmpty())
+ return lvAsn;
+ if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum Result: " + r);
+ String value = new String(Bytes.toString(r.getValue(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM)));
+ // In theory the above value is the latestversion of the column
+ if (LOG.isDebugEnabled()) LOG.debug("getStartingAuditSeqNum for clusterId: " + clusterId + ", valueString is " + value);
+ StringTokenizer stok = new StringTokenizer(value, ",");
+ if (stok.hasMoreElements()) {
+ if (LOG.isTraceEnabled()) LOG.trace("Parsing record in getStartingAuditSeqNum");
+ stok.nextElement(); // skip the control point token
+ asnToken = stok.nextElement().toString();
+ lvAsn = Long.parseLong(asnToken, 10);
+ if (LOG.isTraceEnabled()) LOG.trace("Value for getStartingAuditSeqNum and clusterId: "
+ + clusterId + " is: " + lvAsn);
+ }
if (LOG.isTraceEnabled()) LOG.trace("getStartingAuditSeqNum - exit returning " + lvAsn);
return lvAsn;
}
- public long getNextAuditSeqNum(int nid) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum for node: " + nid);
+ public long getNextAuditSeqNum(int clusterId, int nid) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum for cluster " + clusterId + " node: " + nid);
// We need to open the appropriate control point table and read the value from it
Table remoteTable;
String lv_tName = new String("TRAFODION._DTM_.TLOG" + String.valueOf(nid) + "_CONTROL_POINT");
- Connection remoteConnection = ConnectionFactory.createConnection(this.config);
- remoteTable = remoteConnection.getTable(TableName.valueOf(lv_tName));
+ remoteTable = connection.getTable(TableName.valueOf(lv_tName));
+
+ long lvAsn = 1;
- long highValue = -1;
try {
- Scan s = new Scan();
- s.setCaching(10);
- s.setCacheBlocks(false);
- if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum resultScanner");
- ResultScanner ss = remoteTable.getScanner(s);
- try {
- long currValue;
- String rowKey;
- if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum entering for loop" );
- for (Result r : ss) {
- rowKey = new String(r.getRow());
- if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum rowKey is " + rowKey );
- currValue = Long.parseLong(Bytes.toString(r.value()));
- if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum value is " + currValue);
- if (currValue > highValue) {
- if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum Setting highValue to " + currValue);
- highValue = currValue;
- }
+ Get g = new Get(Bytes.toBytes(clusterId));
+ if (LOG.isDebugEnabled()) LOG.debug("getNextAuditSeqNum attempting remoteTable.get");
+ Result r = remoteTable.get(g);
+ if (!r.isEmpty()){
+ if (LOG.isDebugEnabled()) LOG.debug("getNextAuditSeqNum Result: " + r);
+ String value = new String(Bytes.toString(r.getValue(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM)));
+ // In theory the above value is the latest version of the column
+ if (LOG.isDebugEnabled()) LOG.debug("getNextAuditSeqNum for clusterId: " + clusterId + ", valueString is " + value);
+ StringTokenizer stok = new StringTokenizer(value, ",");
+ if (stok.hasMoreElements()) {
+ if (LOG.isTraceEnabled()) LOG.trace("Parsing record in getNextAuditSeqNum");
+ stok.nextElement(); // skip the control point token
+ String asnToken = stok.nextElement().toString();
+ lvAsn = Long.parseLong(asnToken, 10);
+ if (LOG.isTraceEnabled()) LOG.trace("Value for getNextAuditSeqNum and clusterId: "
+ + clusterId + " is: " + (lvAsn + 1));
}
- } finally {
- ss.close();
}
} finally {
- try {
- remoteTable.close();
- remoteConnection.close();
- }
- catch (IOException e) {
- LOG.error("getNextAuditSeqNum IOException closing table or connection for " + lv_tName);
- e.printStackTrace();
- }
+ remoteTable.close();
}
- if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum returning " + (highValue + 1));
- return (highValue + 1);
- }
+ if (LOG.isTraceEnabled()) LOG.trace("getNextAuditSeqNum returning " + (lvAsn + 1));
+ return (lvAsn + 1);
+ }
+
+ public long doControlPoint(final int clusterId, final long sequenceNumber, final boolean incrementCP) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("doControlPoint start");
- public long doControlPoint(final long sequenceNumber) throws IOException {
- currControlPt++;
- if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), sequenceNumber (" + sequenceNumber+ ") try putRecord");
- putRecord(currControlPt, sequenceNumber);
+ if (incrementCP) {
+ currControlPt++;
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("doControlPoint interval (" + currControlPt + "), clusterId: " + clusterId + ", sequenceNumber (" + sequenceNumber+ ") try putRecord");
+ putRecord(clusterId, currControlPt, sequenceNumber);
+ if (LOG.isTraceEnabled()) LOG.trace("doControlPoint - exit");
return currControlPt;
}
+ public long bumpControlPoint(final int clusterId, final int count) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("bumpControlPoint start, count: " + count);
+ long currASN = -1;
+ currControlPt = getCurrControlPt(clusterId);
+ currASN = getStartingAuditSeqNum(clusterId);
+ for ( int i = 0; i < count; i++ ) {
+ currControlPt++;
+ if (LOG.isTraceEnabled()) LOG.trace("bumpControlPoint putting new record " + (i + 1) + " for control point ("
+ + currControlPt + "), clusterId: " + clusterId + ", ASN (" + currASN + ")");
+ putRecord(clusterId, currControlPt, currASN);
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("bumpControlPoint - exit");
+ return currASN;
+ }
+
public boolean deleteRecord(final long controlPoint) throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("deleteRecord start for control point " + controlPoint);
String controlPtString = new String(String.valueOf(controlPoint));
@@ -317,33 +385,80 @@ public class HBaseAuditControlPoint {
return true;
}
- public boolean deleteAgedRecords(final long controlPoint) throws IOException {
- if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords start - control point " + controlPoint);
+ public boolean deleteAgedRecords(final int clusterId, final long controlPoint) throws IOException {
+ if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords start - clusterId " + clusterId + " control point " + controlPoint);
String controlPtString = new String(String.valueOf(controlPoint));
- Scan s = new Scan();
- s.setCaching(10);
- s.setCacheBlocks(false);
ArrayList<Delete> deleteList = new ArrayList<Delete>();
- ResultScanner ss = table.getScanner(s);
- try {
- String rowKey;
- for (Result r : ss) {
- rowKey = new String(r.getRow());
- if (Long.parseLong(rowKey) < controlPoint) {
- if (LOG.isDebugEnabled()) LOG.debug("Adding (" + rowKey + ") to delete list");
- Delete del = new Delete(rowKey.getBytes());
- deleteList.add(del);
+ Get g = new Get(Bytes.toBytes(clusterId));
+ g.setMaxVersions(versions); // will return last n versions of row
+ g.addColumn(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);
+ String ctrlPtToken;
+ Result r = table.get(g);
+ if (r.isEmpty())
+ return false;
+ List<Cell> list = r.getColumnCells(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM); // returns all versions of this column
+ for (Cell cell : list) {
+ StringTokenizer stok =
+ new StringTokenizer(Bytes.toString(CellUtil.cloneValue(cell)), ",");
+ if (stok.hasMoreElements()) {
+ ctrlPtToken = stok.nextElement().toString();
+ if (LOG.isTraceEnabled()) LOG.trace("Parsing record for controlPoint (" + ctrlPtToken + ")");
+ if (Long.parseLong(ctrlPtToken, 10) <= controlPoint){
+ // This is one we are looking for
+ Delete del = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), cell.getTimestamp());
+ deleteList.add(del);
+ if (LOG.isTraceEnabled()) LOG.trace("Deleting entry for ctrlPtToken: " + ctrlPtToken);
+ }
+ }
+ else {
+ if (LOG.isTraceEnabled()) LOG.trace("No tokens to parse for controlPoint (" + controlPoint + ")");
}
}
if (LOG.isDebugEnabled()) LOG.debug("attempting to delete list with " + deleteList.size() + " elements");
table.delete(deleteList);
- } finally {
- ss.close();
- }
-
if (LOG.isTraceEnabled()) LOG.trace("deleteAgedRecords - exit");
return true;
}
+
+ public String getTableName(){
+ return CONTROL_POINT_TABLE_NAME;
+ }
+
+ public long getNthRecord(int clusterId, int n) throws IOException{
+ if (LOG.isTraceEnabled()) LOG.trace("getNthRecord start - clusterId " + clusterId + " n: " + n);
+
+ Get g = new Get(Bytes.toBytes(clusterId));
+ g.setMaxVersions(n + 1); // will return last n+1 versions of row just in case
+ g.addColumn(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM);
+ String ctrlPtToken;
+ long lvReturn = 1;
+ Result r = table.get(g);
+ if (r.isEmpty())
+ return lvReturn;
+ List<Cell> list = r.getColumnCells(CONTROL_POINT_FAMILY, CP_NUM_AND_ASN_HWM); // returns all versions of this column
+ int i = 0;
+ for (Cell cell : list) {
+ i++;
+ StringTokenizer stok =
+ new StringTokenizer(Bytes.toString(CellUtil.cloneValue(cell)), ",");
+ if (stok.hasMoreElements()) {
+ ctrlPtToken = stok.nextElement().toString();
+ if (LOG.isTraceEnabled()) LOG.trace("Parsing record for controlPoint (" + ctrlPtToken + ")");
+ if ( i < n ){
+ if (LOG.isTraceEnabled()) LOG.trace("Skipping record " + i + " of " + n + " for controlPoint" );
+ continue;
+ }
+ lvReturn = Long.parseLong(ctrlPtToken);;
+ if (LOG.isTraceEnabled()) LOG.trace("getNthRecord exit - returning " + lvReturn);
+ return lvReturn;
+ }
+ else {
+ if (LOG.isTraceEnabled()) LOG.trace("No tokens to parse for " + i);
+ }
+ }
+ if (LOG.isTraceEnabled()) LOG.trace("getNthRecord - exit returning 1");
+ return 1;
+ }
}