You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/12 23:23:52 UTC
[4/4] incubator-tephra git commit: (TEPHRA-240) Include conflicting
key and client id in TransactionConflictException
(TEPHRA-240) Include conflicting key and client id in TransactionConflictException
This closes #47 from GitHub.
Signed-off-by: anew <an...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/174c3325
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/174c3325
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/174c3325
Branch: refs/heads/master
Commit: 174c3325366e33ea7979f2ecfa621b68d77b8a35
Parents: 810c9dd
Author: anew <an...@apache.org>
Authored: Sun Sep 10 21:33:53 2017 -0700
Committer: anew <an...@apache.org>
Committed: Tue Sep 12 16:22:58 2017 -0700
----------------------------------------------------------------------
.../tephra/TransactionConflictException.java | 39 +
.../org/apache/tephra/TransactionContext.java | 32 +-
.../org/apache/tephra/TransactionManager.java | 187 +-
.../apache/tephra/TransactionSystemClient.java | 31 +-
.../java/org/apache/tephra/TxConstants.java | 11 +
.../distributed/TransactionServiceClient.java | 74 +-
.../TransactionServiceThriftClient.java | 42 +-
.../TransactionServiceThriftHandler.java | 39 +-
.../thrift/TTransactionConflictException.java | 602 ++++++
.../distributed/thrift/TTransactionServer.java | 1818 ++++++++++++++----
.../tephra/inmemory/DetachedTxSystemClient.java | 15 +-
.../tephra/inmemory/InMemoryTxSystemClient.java | 27 +-
.../tephra/inmemory/MinimalTxSystemClient.java | 9 +-
.../tephra/persist/TransactionSnapshot.java | 12 +-
tephra-core/src/main/thrift/transaction.thrift | 26 +-
.../java/org/apache/tephra/ClientIdTest.java | 114 ++
.../java/org/apache/tephra/DummyTxAware.java | 123 ++
.../java/org/apache/tephra/DummyTxClient.java | 91 +
.../apache/tephra/TransactionContextTest.java | 207 +-
.../apache/tephra/TransactionExecutorTest.java | 211 +-
.../apache/tephra/TransactionManagerTest.java | 106 +-
.../apache/tephra/TransactionSystemTest.java | 151 +-
.../ThriftTransactionServerTest.java | 4 +-
.../AbstractTransactionStateStorageTest.java | 22 +-
.../tephra/snapshot/SnapshotCodecTest.java | 4 +-
.../coprocessor/TransactionProcessorTest.java | 4 +-
.../TransactionVisibilityFilterTest.java | 13 +-
.../coprocessor/TransactionProcessorTest.java | 4 +-
.../TransactionVisibilityFilterTest.java | 13 +-
.../coprocessor/TransactionProcessorTest.java | 4 +-
.../TransactionVisibilityFilterTest.java | 13 +-
.../coprocessor/TransactionProcessorTest.java | 4 +-
.../TransactionVisibilityFilterTest.java | 13 +-
.../coprocessor/TransactionProcessorTest.java | 10 +-
.../TransactionVisibilityFilterTest.java | 13 +-
.../coprocessor/TransactionProcessorTest.java | 4 +-
.../TransactionVisibilityFilterTest.java | 13 +-
37 files changed, 2997 insertions(+), 1108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java
----------------------------------------------------------------------
diff --git a/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java b/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java
index d07ed04..d3bd180 100644
--- a/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java
+++ b/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java
@@ -22,11 +22,50 @@ package org.apache.tephra;
* Thrown to indicate transaction conflict occurred when trying to commit a transaction.
*/
public class TransactionConflictException extends TransactionFailureException {
+
+ private final Long transactionId;
+ private final String conflictingKey;
+ private final String conflictingClient;
+
+ /**
+ * @deprecated since 0.13-incubating. Use {@link #TransactionConflictException(long, String, String)} instead.
+ */
+ @Deprecated
public TransactionConflictException(String message) {
super(message);
+ transactionId = null;
+ conflictingKey = null;
+ conflictingClient = null;
}
+ /**
+ * @deprecated since 0.13-incubating. Use {@link #TransactionConflictException(long, String, String)} instead.
+ */
+ @Deprecated
public TransactionConflictException(String message, Throwable cause) {
super(message, cause);
+ transactionId = null;
+ conflictingKey = null;
+ conflictingClient = null;
+ }
+
+ public TransactionConflictException(long transactionId, String conflictingKey, String conflictingClient) {
+ super(String.format("Transaction %d conflicts with %s on change key '%s'", transactionId,
+ conflictingClient == null ? "unknown client" : conflictingClient, conflictingKey));
+ this.transactionId = transactionId;
+ this.conflictingKey = conflictingKey;
+ this.conflictingClient = conflictingClient;
+ }
+
+ public Long getTransactionId() {
+ return transactionId;
+ }
+
+ public String getConflictingKey() {
+ return conflictingKey;
+ }
+
+ public String getConflictingClient() {
+ return conflictingClient;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
index 8b4e4fd..3c11e96 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -267,23 +267,16 @@ public class TransactionContext {
// abort will throw that exception
}
}
-
- boolean canCommit = false;
try {
- canCommit = txClient.canCommitOrThrow(currentTx, changes);
- } catch (TransactionNotInProgressException | TransactionSizeException e) {
- throw e;
- // abort will throw that exception
+ txClient.canCommitOrThrow(currentTx, changes);
+ } catch (TransactionFailureException e) {
+ abort(e);
+ // abort will rethrow this exception
} catch (Throwable e) {
String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
abort(new TransactionFailureException(message, e));
// abort will throw that exception
}
- if (!canCommit) {
- String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
- abort(new TransactionConflictException(message));
- // abort will throw
- }
}
private void persist() throws TransactionFailureException {
@@ -311,25 +304,16 @@ public class TransactionContext {
}
private void commit() throws TransactionFailureException {
- boolean commitSuccess = false;
try {
- commitSuccess = txClient.commit(currentTx);
- } catch (TransactionNotInProgressException e) {
- String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
- LOG.warn(message, e);
- abort(new TransactionFailureException(message, e));
- // abort will throw that exception
+ txClient.commitOrThrow(currentTx);
+ } catch (TransactionFailureException e) {
+ abort(e);
+ // abort will rethrow this exception
} catch (Throwable e) {
String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId());
- LOG.warn(message, e);
abort(new TransactionFailureException(message, e));
// abort will throw that exception
}
- if (!commitSuccess) {
- String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
- abort(new TransactionConflictException(message));
- // abort will throw
- }
}
private void postCommit() throws TransactionFailureException {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 4479812..68450c9 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -130,9 +130,9 @@ public class TransactionManager extends AbstractService {
// todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil)
// todo: should this be consolidated with inProgress?
// commit time next writePointer -> changes made by this tx
- private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = new ConcurrentSkipListMap<>();
+ private final NavigableMap<Long, ChangeSet> committedChangeSets = new ConcurrentSkipListMap<>();
// not committed yet
- private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap();
+ private final Map<Long, ChangeSet> committingChangeSets = Maps.newConcurrentMap();
private long readPointer;
private long lastWritePointer;
@@ -157,6 +157,10 @@ public class TransactionManager extends AbstractService {
private DaemonThreadExecutor snapshotThread;
private DaemonThreadExecutor metricsThread;
+ // retention of client id for transactions - this affects memory footprint
+ private final boolean retainClientId;
+ private final boolean retainClientIdPastCommit;
+
// lock guarding change of the current transaction log
private final ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
private final Lock logReadLock = logLock.readLock();
@@ -206,12 +210,19 @@ public class TransactionManager extends AbstractService {
// TODO: REMOVE WITH txnBackwardsCompatCheck()
longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000);
- //
+ ClientIdRetention retention = ClientIdRetention.valueOf(
+ conf.get(TxConstants.Manager.CFG_TX_RETAIN_CLIENT_ID,
+ TxConstants.Manager.DEFAULT_TX_RETAIN_CLIENT_ID).toUpperCase());
+ this.retainClientId = retention != ClientIdRetention.OFF;
+ this.retainClientIdPastCommit = retention == ClientIdRetention.COMMITTED;
+
this.txMetricsCollector = txMetricsCollector;
this.txMetricsCollector.configure(conf);
clear();
}
+ enum ClientIdRetention { OFF, ACTIVE, COMMITTED }
+
private void clear() {
invalidTxList.clear();
inProgress.clear();
@@ -400,7 +411,7 @@ public class TransactionManager extends AbstractService {
}
public synchronized TransactionSnapshot getSnapshot() throws IOException {
- TransactionSnapshot snapshot = null;
+ TransactionSnapshot snapshot;
if (!isRunning() && !isStopping()) {
return null;
}
@@ -436,8 +447,8 @@ public class TransactionManager extends AbstractService {
private void doSnapshot(boolean closing) throws IOException {
long snapshotTime = 0L;
- TransactionSnapshot snapshot = null;
- TransactionLog oldLog = null;
+ TransactionSnapshot snapshot;
+ TransactionLog oldLog;
try {
this.logWriteLock.lock();
try {
@@ -521,8 +532,12 @@ public class TransactionManager extends AbstractService {
lastWritePointer = snapshot.getWritePointer();
invalidTxList.addAll(snapshot.getInvalid());
inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress()));
- committingChangeSets.putAll(snapshot.getCommittingChangeSets());
- committedChangeSets.putAll(snapshot.getCommittedChangeSets());
+ for (Map.Entry<Long, Set<ChangeId>> entry : snapshot.getCommittingChangeSets().entrySet()) {
+ committingChangeSets.put(entry.getKey(), new ChangeSet(null, entry.getValue()));
+ }
+ for (Map.Entry<Long, Set<ChangeId>> entry : snapshot.getCommittedChangeSets().entrySet()) {
+ committedChangeSets.put(entry.getKey(), new ChangeSet(null, entry.getValue()));
+ }
}
/**
@@ -593,7 +608,7 @@ public class TransactionManager extends AbstractService {
if (reader == null) {
continue;
}
- TransactionEdit edit = null;
+ TransactionEdit edit;
while ((edit = reader.next()) != null) {
editCnt++;
switch (edit.getState()) {
@@ -613,7 +628,7 @@ public class TransactionManager extends AbstractService {
addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), expiration, type, null);
break;
case COMMITTING:
- addCommittingChangeSet(edit.getWritePointer(), edit.getChanges());
+ addCommittingChangeSet(edit.getWritePointer(), null, edit.getChanges());
break;
case COMMITTED:
// TODO: need to reconcile usage of transaction id v/s write pointer TEPHRA-140
@@ -621,7 +636,7 @@ public class TransactionManager extends AbstractService {
long[] checkpointPointers = edit.getCheckpointPointers();
long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ?
transactionId : checkpointPointers[checkpointPointers.length - 1];
- doCommit(transactionId, writePointer, edit.getChanges(),
+ doCommit(transactionId, writePointer, new ChangeSet(null, edit.getChanges()),
edit.getCommitPointer(), edit.getCanCommit());
break;
case INVALID:
@@ -670,9 +685,7 @@ public class TransactionManager extends AbstractService {
throw new IllegalArgumentException("Invalid state for WAL entry: " + edit.getState());
}
}
- } catch (IOException ioe) {
- throw Throwables.propagate(ioe);
- } catch (InvalidTruncateTimeException e) {
+ } catch (IOException | InvalidTruncateTimeException e) {
throw Throwables.propagate(e);
}
LOG.info("Read " + editCnt + " edits from log " + log.getName());
@@ -815,7 +828,7 @@ public class TransactionManager extends AbstractService {
}
private Transaction startTx(long expiration, TransactionType type, @Nullable String clientId) {
- Transaction tx = null;
+ Transaction tx;
long txid;
// guard against changes to the transaction log while processing
this.logReadLock.lock();
@@ -824,7 +837,8 @@ public class TransactionManager extends AbstractService {
ensureAvailable();
txid = getNextWritePointer();
tx = createTransaction(txid, type);
- addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type, clientId);
+ addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type,
+ retainClientId ? clientId : null);
}
// appending to WAL out of global lock for concurrent performance
// we should still be able to arrive at the same state even if log entries are out of order
@@ -853,46 +867,42 @@ public class TransactionManager extends AbstractService {
}
}
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
- throws TransactionNotInProgressException, TransactionSizeException {
+ public void canCommit(long txId, Collection<byte[]> changeIds)
+ throws TransactionNotInProgressException, TransactionSizeException, TransactionConflictException {
txMetricsCollector.rate("canCommit");
Stopwatch timer = new Stopwatch().start();
- InProgressTx inProgressTx = inProgress.get(tx.getTransactionId());
+ InProgressTx inProgressTx = inProgress.get(txId);
if (inProgressTx == null) {
synchronized (this) {
// invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
- if (invalidTxList.contains(tx.getTransactionId())) {
+ if (invalidTxList.contains(txId)) {
throw new TransactionNotInProgressException(
String.format(
- "canCommit() is called for transaction %d that is not in progress (it is known to be invalid)",
- tx.getTransactionId()));
+ "canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", txId));
} else {
throw new TransactionNotInProgressException(
- String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
+ String.format("canCommit() is called for transaction %d that is not in progress", txId));
}
}
}
Set<ChangeId> set =
- validateChangeSet(tx, changeIds, inProgressTx.clientId != null ? inProgressTx.clientId : DEFAULT_CLIENTID);
+ validateChangeSet(txId, changeIds, inProgressTx.clientId != null ? inProgressTx.clientId : DEFAULT_CLIENTID);
+ checkForConflicts(txId, set);
- if (hasConflicts(tx, set)) {
- return false;
- }
// guard against changes to the transaction log while processing
this.logReadLock.lock();
try {
synchronized (this) {
ensureAvailable();
- addCommittingChangeSet(tx.getTransactionId(), set);
+ addCommittingChangeSet(txId, inProgressTx.getClientId(), set);
}
- appendToLog(TransactionEdit.createCommitting(tx.getTransactionId(), set));
+ appendToLog(TransactionEdit.createCommitting(txId, set));
} finally {
this.logReadLock.unlock();
}
txMetricsCollector.histogram("canCommit.latency", (int) timer.elapsedMillis());
- return true;
}
/**
@@ -905,19 +915,19 @@ public class TransactionManager extends AbstractService {
* @return the same set of changes, transformed into a set of {@link ChangeId}s.
* @throws TransactionSizeException if the number or total size of the changes exceed the limit.
*/
- private Set<ChangeId> validateChangeSet(Transaction tx, Collection<byte[]> changeIds,
+ private Set<ChangeId> validateChangeSet(long txId, Collection<byte[]> changeIds,
String clientId) throws TransactionSizeException {
if (changeIds.size() > changeSetCountLimit) {
LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries and exceeds " +
"the allowed size of {}. Limit the number of changes, or use a long-running transaction. ",
- tx.getTransactionId(), clientId, changeIds.size(), changeSetCountLimit);
+ txId, clientId, changeIds.size(), changeSetCountLimit);
throw new TransactionSizeException(String.format(
"Change set for transaction %d has %d entries and exceeds the limit of %d",
- tx.getTransactionId(), changeIds.size(), changeSetCountLimit));
+ txId, changeIds.size(), changeSetCountLimit));
} else if (changeIds.size() > changeSetCountThreshold) {
LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries. " +
"It is recommended to limit the number of changes to {}, or to use a long-running transaction. ",
- tx.getTransactionId(), clientId, changeIds.size(), changeSetCountThreshold);
+ txId, clientId, changeIds.size(), changeSetCountThreshold);
}
long byteCount = 0L;
Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size());
@@ -928,26 +938,28 @@ public class TransactionManager extends AbstractService {
if (byteCount > changeSetSizeLimit) {
LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes and exceeds " +
"the allowed size of {} bytes. Limit the total size of changes, or use a long-running transaction. ",
- tx.getTransactionId(), clientId, byteCount, changeSetSizeLimit);
+ txId, clientId, byteCount, changeSetSizeLimit);
throw new TransactionSizeException(String.format(
"Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes",
- tx.getTransactionId(), byteCount, changeSetSizeLimit));
+ txId, byteCount, changeSetSizeLimit));
} else if (byteCount > changeSetSizeThreshold) {
LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes. " +
"It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ",
- tx.getTransactionId(), clientId, byteCount, changeSetSizeThreshold);
+ txId, clientId, byteCount, changeSetSizeThreshold);
}
return set;
}
- private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) {
- committingChangeSets.put(writePointer, changes);
+ private void addCommittingChangeSet(long writePointer, String clientId, Set<ChangeId> changes) {
+ committingChangeSets.put(writePointer, new ChangeSet(retainClientIdPastCommit ? clientId : null, changes));
}
- public boolean commit(Transaction tx) throws TransactionNotInProgressException {
+ public void commit(long txId, long writePointer)
+ throws TransactionNotInProgressException, TransactionConflictException {
+
txMetricsCollector.rate("commit");
Stopwatch timer = new Stopwatch().start();
- Set<ChangeId> changeSet = null;
+ ChangeSet changeSet;
boolean addToCommitted = true;
long commitPointer;
// guard against changes to the transaction log while processing
@@ -958,57 +970,55 @@ public class TransactionManager extends AbstractService {
// we record commits at the first not-yet assigned transaction id to simplify clearing out change sets that
// are no longer visible by any in-progress transactions
commitPointer = lastWritePointer + 1;
- if (inProgress.get(tx.getTransactionId()) == null) {
+ if (inProgress.get(txId) == null) {
// invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
- if (invalidTxList.contains(tx.getTransactionId())) {
+ if (invalidTxList.contains(txId)) {
throw new TransactionNotInProgressException(
String.format("canCommit() is called for transaction %d that is not in progress " +
- "(it is known to be invalid)", tx.getTransactionId()));
+ "(it is known to be invalid)", txId));
} else {
throw new TransactionNotInProgressException(
- String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
+ String.format("canCommit() is called for transaction %d that is not in progress", txId));
}
}
// these should be atomic
// NOTE: whether we succeed or not we don't need to keep changes in committing state: same tx cannot
// be attempted to commit twice
- changeSet = committingChangeSets.remove(tx.getTransactionId());
+ changeSet = committingChangeSets.remove(txId);
if (changeSet != null) {
// double-checking if there are conflicts: someone may have committed since canCommit check
- if (hasConflicts(tx, changeSet)) {
- return false;
- }
+ checkForConflicts(txId, changeSet.getChangeIds());
} else {
// no changes
addToCommitted = false;
}
- doCommit(tx.getTransactionId(), tx.getWritePointer(), changeSet, commitPointer, addToCommitted);
+ doCommit(txId, writePointer, changeSet, commitPointer, addToCommitted);
}
- appendToLog(TransactionEdit.createCommitted(tx.getTransactionId(), changeSet, commitPointer, addToCommitted));
+ appendToLog(TransactionEdit.createCommitted(txId, changeSet == null ? null : changeSet.getChangeIds(),
+ commitPointer, addToCommitted));
} finally {
this.logReadLock.unlock();
}
txMetricsCollector.histogram("commit.latency", (int) timer.elapsedMillis());
- return true;
}
- private void doCommit(long transactionId, long writePointer, Set<ChangeId> changes, long commitPointer,
+ private void doCommit(long transactionId, long writePointer, ChangeSet changes, long commitPointer,
boolean addToCommitted) {
// In case this method is called when loading a previous WAL, we need to remove the tx from these sets
committingChangeSets.remove(transactionId);
- if (addToCommitted && !changes.isEmpty()) {
+ if (addToCommitted && !changes.getChangeIds().isEmpty()) {
// No need to add empty changes to the committed change sets, they will never trigger any conflict
// Record the committed change set with the next writePointer as the commit time.
// NOTE: we use current next writePointer as key for the map, hence we may have multiple txs changesets to be
// stored under one key
- Set<ChangeId> changeIds = committedChangeSets.get(commitPointer);
- if (changeIds != null) {
+ ChangeSet committed = committedChangeSets.get(commitPointer);
+ if (committed != null) {
// NOTE: we modify the new set to prevent concurrent modification exception, as other threads (e.g. in
// canCommit) use it unguarded
- changes.addAll(changeIds);
+ changes.getChangeIds().addAll(committed.getChangeIds());
}
committedChangeSets.put(commitPointer, changes);
}
@@ -1112,7 +1122,7 @@ public class TransactionManager extends AbstractService {
}
private boolean doInvalidate(long writePointer) {
- Set<ChangeId> previousChangeSet = committingChangeSets.remove(writePointer);
+ ChangeSet previousChangeSet = committingChangeSets.remove(writePointer);
// remove from in-progress set, so that it does not get excluded in the future
InProgressTx previous = inProgress.remove(writePointer);
@@ -1224,9 +1234,9 @@ public class TransactionManager extends AbstractService {
txMetricsCollector.rate("checkpoint");
Stopwatch timer = new Stopwatch().start();
- Transaction checkpointedTx = null;
+ Transaction checkpointedTx;
long txId = originalTx.getTransactionId();
- long newWritePointer = 0;
+ long newWritePointer;
// guard against changes to the transaction log while processing
this.logReadLock.lock();
try {
@@ -1284,39 +1294,49 @@ public class TransactionManager extends AbstractService {
return this.committedChangeSets.size();
}
- private boolean hasConflicts(Transaction tx, Set<ChangeId> changeIds) {
+ @Nullable
+ @VisibleForTesting
+ InProgressTx getInProgress(long transactionId) {
+ return inProgress.get(transactionId);
+ }
+
+ private void checkForConflicts(long txId, Set<ChangeId> changeIds) throws TransactionConflictException {
if (changeIds.isEmpty()) {
- return false;
+ return;
}
- for (Map.Entry<Long, Set<ChangeId>> changeSet : committedChangeSets.entrySet()) {
+ for (Map.Entry<Long, ChangeSet> committed : committedChangeSets.entrySet()) {
// If commit time is greater than tx read-pointer,
// basically not visible but committed means "tx committed after given tx was started"
- if (changeSet.getKey() > tx.getTransactionId()) {
- if (overlap(changeSet.getValue(), changeIds)) {
- return true;
+ if (committed.getKey() > txId) {
+ ChangeId change = overlap(committed.getValue().getChangeIds(), changeIds);
+ if (change != null) {
+ throw new TransactionConflictException(txId, change.toString(), committed.getValue().getClientId());
}
}
}
- return false;
}
- private boolean overlap(Set<ChangeId> a, Set<ChangeId> b) {
+ /**
+ * Checks for overlap in two change sets, returns the first common change it finds, or null if no overlap.
+ */
+ @Nullable
+ private ChangeId overlap(Set<ChangeId> a, Set<ChangeId> b) {
// iterate over the smaller set, and check for every element in the other set
if (a.size() > b.size()) {
for (ChangeId change : b) {
if (a.contains(change)) {
- return true;
+ return change;
}
}
} else {
for (ChangeId change : a) {
if (b.contains(change)) {
- return true;
+ return change;
}
}
}
- return false;
+ return null;
}
private void moveReadPointerIfNeeded(long committedWritePointer) {
@@ -1385,9 +1405,9 @@ public class TransactionManager extends AbstractService {
}
private abstract static class DaemonThreadExecutor extends Thread {
- private AtomicBoolean stopped = new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
- public DaemonThreadExecutor(String name) {
+ DaemonThreadExecutor(String name) {
super(name);
setDaemon(true);
}
@@ -1577,4 +1597,25 @@ public class TransactionManager extends AbstractService {
}
}
+ /**
+ * Represents a set of changes from a client.
+ */
+ public static class ChangeSet {
+ final String clientId;
+ final Set<ChangeId> changeIds;
+
+ ChangeSet(@Nullable String clientId, Set<ChangeId> changeIds) {
+ this.clientId = clientId;
+ this.changeIds = changeIds;
+ }
+
+ @Nullable
+ public String getClientId() {
+ return clientId;
+ }
+
+ public Set<ChangeId> getChangeIds() {
+ return changeIds;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
index a44f131..29fbe62 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
@@ -60,7 +60,7 @@ public interface TransactionSystemClient {
/**
* Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and
* refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call
- * and {@link #commit(Transaction)} which may check conflicts again to avoid races.
+ * and {@link #commitOrThrow(Transaction)} which will check conflicts again to avoid races.
* <p/>
* Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts
* before we persist changes to avoid rollback in case of conflicts as much as possible.
@@ -80,7 +80,7 @@ public interface TransactionSystemClient {
/**
* Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and
* refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call
- * and {@link #commit(Transaction)} which may check conflicts again to avoid races.
+ * and {@link #commitOrThrow(Transaction)} which will check conflicts again to avoid races.
* <p/>
* Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts
* before we persist changes to avoid rollback in case of conflicts as much as possible.
@@ -89,21 +89,38 @@ public interface TransactionSystemClient {
*
* @param tx transaction to verify
* @param changeIds ids of changes made by transaction
- * @return true if transaction can be committed otherwise false
- * @throws TransactionSizeException if the size of the chgange set exceeds the allowed limit
- * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out.
+ *
+ * @throws TransactionSizeException if the size of the change set exceeds the allowed limit
+ * @throws TransactionConflictException if the change set has a conflict with an overlapping transaction
+ * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out
*/
- boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException;
+ void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+ throws TransactionFailureException;
/**
* Makes transaction visible. It will again check conflicts of changes submitted previously with
- * {@link #canCommit(Transaction, java.util.Collection)}
+ * {@link #canCommitOrThrow(Transaction, java.util.Collection)}
+ *
* @param tx transaction to make visible.
* @return true if transaction can be committed otherwise false
+ *
+ * @deprecated as of 0.13-incubating. Use {@link #canCommitOrThrow(Transaction, Collection)} instead.
*/
+ @Deprecated
boolean commit(Transaction tx) throws TransactionNotInProgressException;
/**
+ * Makes transaction visible. It will again check conflicts of changes submitted previously with
+ * {@link #canCommitOrThrow(Transaction, java.util.Collection)}
+ *
+ * @param tx transaction to make visible.
+ *
+ * @throws TransactionConflictException if the transaction has a conflict with an overlapping transaction
+ * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out
+ */
+ void commitOrThrow(Transaction tx) throws TransactionFailureException;
+
+ /**
* Makes transaction visible. You should call it only when all changes of this tx are undone.
* NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out.
* @param tx transaction to make visible.
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 5c78aa4..3a6b70a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -196,6 +196,17 @@ public class TxConstants {
public static final long DEFAULT_TX_CHANGESET_SIZE_LIMIT = Long.MAX_VALUE;
/** The default warning threshold for the total size in bytes of a change set is unlimited. */
public static final long DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD = Long.MAX_VALUE;
+
+ /** Whether and how long to retain the client id of a transaction. Valid values are:
+ * <ul>
+ * <li>OFF - do not retain the client id at all</li>
+ * <li>ACTIVE - retain the client id until a transaction commits, aborts, or is invalidated</li>
+ * <li>COMMITTED - retain the client id after it commits, as long as it participates in conflict detection</li>
+ * </ul>
+ */
+ public static final String CFG_TX_RETAIN_CLIENT_ID = "data.tx.retain.client.id";
+ /** Default for how long to retain a transaction's client id */
+ public static final String DEFAULT_TX_RETAIN_CLIENT_ID = "COMMITTED";
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
index f1743de..938eef5 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
@@ -27,6 +27,7 @@ import com.google.inject.name.Named;
import org.apache.hadoop.conf.Configuration;
import org.apache.tephra.InvalidTruncateTimeException;
import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
import org.apache.tephra.TransactionFailureException;
import org.apache.tephra.TransactionNotInProgressException;
@@ -75,7 +76,6 @@ public class TransactionServiceClient implements TransactionSystemClient {
/**
* Utility to be used for basic verification of transaction system availability and functioning
* @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx
- * @throws Exception
*/
public static void main(String[] args) throws Exception {
if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) {
@@ -115,19 +115,14 @@ public class TransactionServiceClient implements TransactionSystemClient {
", invalids: " + tx.getInvalids().length +
", inProgress: " + tx.getInProgress().length);
}
- LOG.info("Checking if canCommit tx...");
- boolean canCommit = client.canCommitOrThrow(tx, Collections.<byte[]>emptyList());
- LOG.info("canCommit: " + canCommit);
- if (canCommit) {
+ try {
+ LOG.info("Checking if canCommit tx...");
+ client.canCommitOrThrow(tx, Collections.<byte[]>emptyList());
+ LOG.info("canCommit: success");
LOG.info("Committing tx...");
- boolean committed = client.commit(tx);
- LOG.info("Committed tx: " + committed);
- if (!committed) {
- LOG.info("Aborting tx...");
- client.abort(tx);
- LOG.info("Aborted tx...");
- }
- } else {
+ client.commitOrThrow(tx);
+ LOG.info("Committed tx: success");
+ } catch (TransactionConflictException e) {
LOG.info("Aborting tx...");
client.abort(tx);
LOG.info("Aborted tx...");
@@ -322,25 +317,16 @@ public class TransactionServiceClient implements TransactionSystemClient {
@Override
public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds)
throws TransactionNotInProgressException {
-
try {
- return execute(
- new Operation<Boolean>("canCommit") {
- @Override
- public Boolean execute(TransactionServiceThriftClient client)
- throws Exception {
- return client.canCommit(tx, changeIds);
- }
- });
- } catch (TransactionNotInProgressException e) {
- throw e;
- } catch (Exception e) {
- throw Throwables.propagate(e);
+ canCommitOrThrow(tx, changeIds);
+ return true;
+ } catch (TransactionFailureException e) {
+ return false;
}
}
@Override
- public boolean canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds)
+ public void canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds)
throws TransactionFailureException {
// we want to validate the size of the change set here before sending it over the wire.
@@ -369,15 +355,16 @@ public class TransactionServiceClient implements TransactionSystemClient {
}
try {
- return execute(
- new Operation<Boolean>("canCommit") {
+ execute(
+ new Operation<Void>("canCommit") {
@Override
- public Boolean execute(TransactionServiceThriftClient client)
+ public Void execute(TransactionServiceThriftClient client)
throws Exception {
- return client.canCommitOrThrow(tx, changeIds);
+ client.canCommit(tx, changeIds);
+ return null;
}
});
- } catch (TransactionNotInProgressException | TransactionSizeException e) {
+ } catch (TransactionNotInProgressException | TransactionSizeException | TransactionConflictException e) {
throw e;
} catch (Exception e) {
throw Throwables.propagate(e);
@@ -387,15 +374,26 @@ public class TransactionServiceClient implements TransactionSystemClient {
@Override
public boolean commit(final Transaction tx) throws TransactionNotInProgressException {
try {
- return this.execute(
- new Operation<Boolean>("commit") {
+ commitOrThrow(tx);
+ return true;
+ } catch (TransactionFailureException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void commitOrThrow(final Transaction tx)
+ throws TransactionFailureException {
+ try {
+ execute(
+ new Operation<Void>("commit") {
@Override
- public Boolean execute(TransactionServiceThriftClient client)
- throws Exception {
- return client.commit(tx);
+ public Void execute(TransactionServiceThriftClient client) throws Exception {
+ client.commit(tx.getTransactionId(), tx.getWritePointer());
+ return null;
}
});
- } catch (TransactionNotInProgressException e) {
+ } catch (TransactionNotInProgressException | TransactionConflictException e) {
throw e;
} catch (Exception e) {
throw Throwables.propagate(e);
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
index ba37243..6ce7b84 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
@@ -23,11 +23,13 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.tephra.InvalidTruncateTimeException;
import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
import org.apache.tephra.TransactionNotInProgressException;
import org.apache.tephra.TransactionSizeException;
import org.apache.tephra.distributed.thrift.TGenericException;
import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException;
+import org.apache.tephra.distributed.thrift.TTransactionConflictException;
import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException;
import org.apache.tephra.distributed.thrift.TTransactionServer;
@@ -64,12 +66,12 @@ public class TransactionServiceThriftClient {
/**
* The thrift transport layer. We need this when we close the connection.
*/
- TTransport transport;
+ private TTransport transport;
/**
* The actual thrift client.
*/
- TTransactionServer.Client client;
+ private TTransactionServer.Client client;
/**
* Whether this client is valid for use.
@@ -184,30 +186,19 @@ public class TransactionServiceThriftClient {
}
}
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
- throws TException, TransactionNotInProgressException {
+ public void canCommit(Transaction tx, Collection<byte[]> changeIds)
+ throws TException, TransactionNotInProgressException, TransactionSizeException, TransactionConflictException {
try {
- return client.canCommitTx(TransactionConverterUtils.wrap(tx),
- ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
- } catch (TTransactionNotInProgressException e) {
- throw new TransactionNotInProgressException(e.getMessage());
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
- throws TException, TransactionNotInProgressException, TransactionSizeException {
- try {
- return client.canCommitTx(TransactionConverterUtils.wrap(tx),
- ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
+ client.canCommitOrThrow(tx.getTransactionId(),
+ ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER)));
} catch (TTransactionNotInProgressException e) {
throw new TransactionNotInProgressException(e.getMessage());
+ } catch (TTransactionConflictException e) {
+ throw new TransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient());
} catch (TGenericException e) {
// currently, we only expect TransactionSizeException here
if (!TransactionSizeException.class.getName().equals(e.getOriginalExceptionClass())) {
- LOG.trace("Expecting only {} as the original exception class but found {}",
+ LOG.debug("Expecting only {} as the original exception class but found {}",
TransactionSizeException.class.getName(), e.getOriginalExceptionClass());
throw e;
}
@@ -218,11 +209,18 @@ public class TransactionServiceThriftClient {
}
}
- public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException {
+ public void commit(long txId, long wp)
+ throws TException, TransactionNotInProgressException, TransactionConflictException {
try {
- return client.commitTx(TransactionConverterUtils.wrap(tx)).isValue();
+ client.commitOrThrow(txId, wp);
} catch (TTransactionNotInProgressException e) {
throw new TransactionNotInProgressException(e.getMessage());
+ } catch (TTransactionConflictException e) {
+ throw new TransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient());
+ } catch (TGenericException e) {
+ // we never throw this from commitOrThrow() - it was added as place holder to avoid future thrift API changes
+ LOG.debug("Unexpected {} from commitOrThrow()", TGenericException.class.getName());
+ throw e;
} catch (TException e) {
isValid.set(false);
throw e;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
index 0c9105b..95988c0 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
@@ -20,6 +20,7 @@ package org.apache.tephra.distributed;
import com.google.common.collect.Sets;
import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.TransactionConflictException;
import org.apache.tephra.TransactionManager;
import org.apache.tephra.TransactionNotInProgressException;
import org.apache.tephra.TransactionSizeException;
@@ -28,6 +29,7 @@ import org.apache.tephra.distributed.thrift.TBoolean;
import org.apache.tephra.distributed.thrift.TGenericException;
import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException;
import org.apache.tephra.distributed.thrift.TTransaction;
+import org.apache.tephra.distributed.thrift.TTransactionConflictException;
import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException;
import org.apache.tephra.distributed.thrift.TTransactionServer;
@@ -119,25 +121,16 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
@Override
public TBoolean canCommitTx(TTransaction tx, Set<ByteBuffer> changes) throws TException {
-
- Set<byte[]> changeIds = Sets.newHashSet();
- for (ByteBuffer bb : changes) {
- byte[] changeId = new byte[bb.remaining()];
- bb.get(changeId);
- changeIds.add(changeId);
- }
try {
- return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds));
- } catch (TransactionNotInProgressException e) {
- throw new TTransactionNotInProgressException(e.getMessage());
- } catch (TransactionSizeException e) {
- return new TBoolean(false); // can't throw exception -> just indicate that it failed
+ canCommitOrThrow(tx.getTransactionId(), changes);
+ return new TBoolean(true);
+ } catch (TTransactionConflictException | TGenericException e) {
+ return new TBoolean(false);
}
}
@Override
- public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TException {
-
+ public void canCommitOrThrow(long txId, Set<ByteBuffer> changes) throws TException {
Set<byte[]> changeIds = Sets.newHashSet();
for (ByteBuffer bb : changes) {
byte[] changeId = new byte[bb.remaining()];
@@ -145,20 +138,34 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
changeIds.add(changeId);
}
try {
- return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds));
+ txManager.canCommit(txId, changeIds);
} catch (TransactionNotInProgressException e) {
throw new TTransactionNotInProgressException(e.getMessage());
} catch (TransactionSizeException e) {
throw new TGenericException(e.getMessage(), TransactionSizeException.class.getName());
+ } catch (TransactionConflictException e) {
+ throw new TTransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient());
}
}
@Override
public TBoolean commitTx(TTransaction tx) throws TException {
try {
- return new TBoolean(txManager.commit(TransactionConverterUtils.unwrap(tx)));
+ commitOrThrow(tx.getTransactionId(), tx.getWritePointer());
+ return new TBoolean(true);
+ } catch (TTransactionConflictException | TGenericException e) {
+ return new TBoolean(false);
+ }
+ }
+
+ @Override
+ public void commitOrThrow(long txId, long wp) throws TException {
+ try {
+ txManager.commit(txId, wp);
} catch (TransactionNotInProgressException e) {
throw new TTransactionNotInProgressException(e.getMessage());
+ } catch (TransactionConflictException e) {
+ throw new TTransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java
new file mode 100644
index 0000000..d7c6c9f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.tephra.distributed.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TTransactionConflictException extends TException implements org.apache.thrift.TBase<TTransactionConflictException, TTransactionConflictException._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TTransactionConflictException");
+
+ private static final org.apache.thrift.protocol.TField TRANSACTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("transactionId", org.apache.thrift.protocol.TType.I64, (short)1);
+ private static final org.apache.thrift.protocol.TField CONFLICTING_KEY_FIELD_DESC = new org.apache.thrift.protocol.TField("conflictingKey", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField CONFLICTING_CLIENT_FIELD_DESC = new org.apache.thrift.protocol.TField("conflictingClient", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new TTransactionConflictExceptionStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new TTransactionConflictExceptionTupleSchemeFactory());
+ }
+
+ public long transactionId; // required
+ public String conflictingKey; // required
+ public String conflictingClient; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ TRANSACTION_ID((short)1, "transactionId"),
+ CONFLICTING_KEY((short)2, "conflictingKey"),
+ CONFLICTING_CLIENT((short)3, "conflictingClient");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TRANSACTION_ID
+ return TRANSACTION_ID;
+ case 2: // CONFLICTING_KEY
+ return CONFLICTING_KEY;
+ case 3: // CONFLICTING_CLIENT
+ return CONFLICTING_CLIENT;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __TRANSACTIONID_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TRANSACTION_ID, new org.apache.thrift.meta_data.FieldMetaData("transactionId", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.CONFLICTING_KEY, new org.apache.thrift.meta_data.FieldMetaData("conflictingKey", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.CONFLICTING_CLIENT, new org.apache.thrift.meta_data.FieldMetaData("conflictingClient", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TTransactionConflictException.class, metaDataMap);
+ }
+
+ public TTransactionConflictException() {
+ }
+
+ public TTransactionConflictException(
+ long transactionId,
+ String conflictingKey,
+ String conflictingClient)
+ {
+ this();
+ this.transactionId = transactionId;
+ setTransactionIdIsSet(true);
+ this.conflictingKey = conflictingKey;
+ this.conflictingClient = conflictingClient;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public TTransactionConflictException(TTransactionConflictException other) {
+ __isset_bitfield = other.__isset_bitfield;
+ this.transactionId = other.transactionId;
+ if (other.isSetConflictingKey()) {
+ this.conflictingKey = other.conflictingKey;
+ }
+ if (other.isSetConflictingClient()) {
+ this.conflictingClient = other.conflictingClient;
+ }
+ }
+
+ public TTransactionConflictException deepCopy() {
+ return new TTransactionConflictException(this);
+ }
+
+ @Override
+ public void clear() {
+ setTransactionIdIsSet(false);
+ this.transactionId = 0;
+ this.conflictingKey = null;
+ this.conflictingClient = null;
+ }
+
+ public long getTransactionId() {
+ return this.transactionId;
+ }
+
+ public TTransactionConflictException setTransactionId(long transactionId) {
+ this.transactionId = transactionId;
+ setTransactionIdIsSet(true);
+ return this;
+ }
+
+ public void unsetTransactionId() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID);
+ }
+
+ /** Returns true if field transactionId is set (has been assigned a value) and false otherwise */
+ public boolean isSetTransactionId() {
+ return EncodingUtils.testBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID);
+ }
+
+ public void setTransactionIdIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID, value);
+ }
+
+ public String getConflictingKey() {
+ return this.conflictingKey;
+ }
+
+ public TTransactionConflictException setConflictingKey(String conflictingKey) {
+ this.conflictingKey = conflictingKey;
+ return this;
+ }
+
+ public void unsetConflictingKey() {
+ this.conflictingKey = null;
+ }
+
+ /** Returns true if field conflictingKey is set (has been assigned a value) and false otherwise */
+ public boolean isSetConflictingKey() {
+ return this.conflictingKey != null;
+ }
+
+ public void setConflictingKeyIsSet(boolean value) {
+ if (!value) {
+ this.conflictingKey = null;
+ }
+ }
+
+ public String getConflictingClient() {
+ return this.conflictingClient;
+ }
+
+ public TTransactionConflictException setConflictingClient(String conflictingClient) {
+ this.conflictingClient = conflictingClient;
+ return this;
+ }
+
+ public void unsetConflictingClient() {
+ this.conflictingClient = null;
+ }
+
+ /** Returns true if field conflictingClient is set (has been assigned a value) and false otherwise */
+ public boolean isSetConflictingClient() {
+ return this.conflictingClient != null;
+ }
+
+ public void setConflictingClientIsSet(boolean value) {
+ if (!value) {
+ this.conflictingClient = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case TRANSACTION_ID:
+ if (value == null) {
+ unsetTransactionId();
+ } else {
+ setTransactionId((Long)value);
+ }
+ break;
+
+ case CONFLICTING_KEY:
+ if (value == null) {
+ unsetConflictingKey();
+ } else {
+ setConflictingKey((String)value);
+ }
+ break;
+
+ case CONFLICTING_CLIENT:
+ if (value == null) {
+ unsetConflictingClient();
+ } else {
+ setConflictingClient((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TRANSACTION_ID:
+ return Long.valueOf(getTransactionId());
+
+ case CONFLICTING_KEY:
+ return getConflictingKey();
+
+ case CONFLICTING_CLIENT:
+ return getConflictingClient();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TRANSACTION_ID:
+ return isSetTransactionId();
+ case CONFLICTING_KEY:
+ return isSetConflictingKey();
+ case CONFLICTING_CLIENT:
+ return isSetConflictingClient();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof TTransactionConflictException)
+ return this.equals((TTransactionConflictException)that);
+ return false;
+ }
+
+ public boolean equals(TTransactionConflictException that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_transactionId = true;
+ boolean that_present_transactionId = true;
+ if (this_present_transactionId || that_present_transactionId) {
+ if (!(this_present_transactionId && that_present_transactionId))
+ return false;
+ if (this.transactionId != that.transactionId)
+ return false;
+ }
+
+ boolean this_present_conflictingKey = true && this.isSetConflictingKey();
+ boolean that_present_conflictingKey = true && that.isSetConflictingKey();
+ if (this_present_conflictingKey || that_present_conflictingKey) {
+ if (!(this_present_conflictingKey && that_present_conflictingKey))
+ return false;
+ if (!this.conflictingKey.equals(that.conflictingKey))
+ return false;
+ }
+
+ boolean this_present_conflictingClient = true && this.isSetConflictingClient();
+ boolean that_present_conflictingClient = true && that.isSetConflictingClient();
+ if (this_present_conflictingClient || that_present_conflictingClient) {
+ if (!(this_present_conflictingClient && that_present_conflictingClient))
+ return false;
+ if (!this.conflictingClient.equals(that.conflictingClient))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(TTransactionConflictException other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ TTransactionConflictException typedOther = (TTransactionConflictException)other;
+
+ lastComparison = Boolean.valueOf(isSetTransactionId()).compareTo(typedOther.isSetTransactionId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTransactionId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.transactionId, typedOther.transactionId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetConflictingKey()).compareTo(typedOther.isSetConflictingKey());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetConflictingKey()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.conflictingKey, typedOther.conflictingKey);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetConflictingClient()).compareTo(typedOther.isSetConflictingClient());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetConflictingClient()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.conflictingClient, typedOther.conflictingClient);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("TTransactionConflictException(");
+ boolean first = true;
+
+ sb.append("transactionId:");
+ sb.append(this.transactionId);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("conflictingKey:");
+ if (this.conflictingKey == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.conflictingKey);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("conflictingClient:");
+ if (this.conflictingClient == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.conflictingClient);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class TTransactionConflictExceptionStandardSchemeFactory implements SchemeFactory {
+ public TTransactionConflictExceptionStandardScheme getScheme() {
+ return new TTransactionConflictExceptionStandardScheme();
+ }
+ }
+
+ private static class TTransactionConflictExceptionStandardScheme extends StandardScheme<TTransactionConflictException> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, TTransactionConflictException struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TRANSACTION_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.transactionId = iprot.readI64();
+ struct.setTransactionIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // CONFLICTING_KEY
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.conflictingKey = iprot.readString();
+ struct.setConflictingKeyIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // CONFLICTING_CLIENT
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.conflictingClient = iprot.readString();
+ struct.setConflictingClientIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, TTransactionConflictException struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldBegin(TRANSACTION_ID_FIELD_DESC);
+ oprot.writeI64(struct.transactionId);
+ oprot.writeFieldEnd();
+ if (struct.conflictingKey != null) {
+ oprot.writeFieldBegin(CONFLICTING_KEY_FIELD_DESC);
+ oprot.writeString(struct.conflictingKey);
+ oprot.writeFieldEnd();
+ }
+ if (struct.conflictingClient != null) {
+ oprot.writeFieldBegin(CONFLICTING_CLIENT_FIELD_DESC);
+ oprot.writeString(struct.conflictingClient);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class TTransactionConflictExceptionTupleSchemeFactory implements SchemeFactory {
+ public TTransactionConflictExceptionTupleScheme getScheme() {
+ return new TTransactionConflictExceptionTupleScheme();
+ }
+ }
+
+ private static class TTransactionConflictExceptionTupleScheme extends TupleScheme<TTransactionConflictException> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, TTransactionConflictException struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetTransactionId()) {
+ optionals.set(0);
+ }
+ if (struct.isSetConflictingKey()) {
+ optionals.set(1);
+ }
+ if (struct.isSetConflictingClient()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
+ if (struct.isSetTransactionId()) {
+ oprot.writeI64(struct.transactionId);
+ }
+ if (struct.isSetConflictingKey()) {
+ oprot.writeString(struct.conflictingKey);
+ }
+ if (struct.isSetConflictingClient()) {
+ oprot.writeString(struct.conflictingClient);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, TTransactionConflictException struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(3);
+ if (incoming.get(0)) {
+ struct.transactionId = iprot.readI64();
+ struct.setTransactionIdIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.conflictingKey = iprot.readString();
+ struct.setConflictingKeyIsSet(true);
+ }
+ if (incoming.get(2)) {
+ struct.conflictingClient = iprot.readString();
+ struct.setConflictingClientIsSet(true);
+ }
+ }
+ }
+
+}
+