You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/12 23:23:52 UTC

[4/4] incubator-tephra git commit: (TEPHRA-240) Include conflicting key and client id in TransactionConflictException

(TEPHRA-240) Include conflicting key and client id in TransactionConflictException

This closes #47 from GitHub.

Signed-off-by: anew <an...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/174c3325
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/174c3325
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/174c3325

Branch: refs/heads/master
Commit: 174c3325366e33ea7979f2ecfa621b68d77b8a35
Parents: 810c9dd
Author: anew <an...@apache.org>
Authored: Sun Sep 10 21:33:53 2017 -0700
Committer: anew <an...@apache.org>
Committed: Tue Sep 12 16:22:58 2017 -0700

----------------------------------------------------------------------
 .../tephra/TransactionConflictException.java    |   39 +
 .../org/apache/tephra/TransactionContext.java   |   32 +-
 .../org/apache/tephra/TransactionManager.java   |  187 +-
 .../apache/tephra/TransactionSystemClient.java  |   31 +-
 .../java/org/apache/tephra/TxConstants.java     |   11 +
 .../distributed/TransactionServiceClient.java   |   74 +-
 .../TransactionServiceThriftClient.java         |   42 +-
 .../TransactionServiceThriftHandler.java        |   39 +-
 .../thrift/TTransactionConflictException.java   |  602 ++++++
 .../distributed/thrift/TTransactionServer.java  | 1818 ++++++++++++++----
 .../tephra/inmemory/DetachedTxSystemClient.java |   15 +-
 .../tephra/inmemory/InMemoryTxSystemClient.java |   27 +-
 .../tephra/inmemory/MinimalTxSystemClient.java  |    9 +-
 .../tephra/persist/TransactionSnapshot.java     |   12 +-
 tephra-core/src/main/thrift/transaction.thrift  |   26 +-
 .../java/org/apache/tephra/ClientIdTest.java    |  114 ++
 .../java/org/apache/tephra/DummyTxAware.java    |  123 ++
 .../java/org/apache/tephra/DummyTxClient.java   |   91 +
 .../apache/tephra/TransactionContextTest.java   |  207 +-
 .../apache/tephra/TransactionExecutorTest.java  |  211 +-
 .../apache/tephra/TransactionManagerTest.java   |  106 +-
 .../apache/tephra/TransactionSystemTest.java    |  151 +-
 .../ThriftTransactionServerTest.java            |    4 +-
 .../AbstractTransactionStateStorageTest.java    |   22 +-
 .../tephra/snapshot/SnapshotCodecTest.java      |    4 +-
 .../coprocessor/TransactionProcessorTest.java   |    4 +-
 .../TransactionVisibilityFilterTest.java        |   13 +-
 .../coprocessor/TransactionProcessorTest.java   |    4 +-
 .../TransactionVisibilityFilterTest.java        |   13 +-
 .../coprocessor/TransactionProcessorTest.java   |    4 +-
 .../TransactionVisibilityFilterTest.java        |   13 +-
 .../coprocessor/TransactionProcessorTest.java   |    4 +-
 .../TransactionVisibilityFilterTest.java        |   13 +-
 .../coprocessor/TransactionProcessorTest.java   |   10 +-
 .../TransactionVisibilityFilterTest.java        |   13 +-
 .../coprocessor/TransactionProcessorTest.java   |    4 +-
 .../TransactionVisibilityFilterTest.java        |   13 +-
 37 files changed, 2997 insertions(+), 1108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java
----------------------------------------------------------------------
diff --git a/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java b/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java
index d07ed04..d3bd180 100644
--- a/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java
+++ b/tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java
@@ -22,11 +22,50 @@ package org.apache.tephra;
  * Thrown to indicate transaction conflict occurred when trying to commit a transaction.
  */
 public class TransactionConflictException extends TransactionFailureException {
+
+  private final Long transactionId;
+  private final String conflictingKey;
+  private final String conflictingClient;
+
+  /**
+   * @deprecated since 0.13-incubating. Use {@link #TransactionConflictException(long, String, String)} instead.
+   */
+  @Deprecated
   public TransactionConflictException(String message) {
     super(message);
+    transactionId = null;
+    conflictingKey = null;
+    conflictingClient = null;
   }
 
+  /**
+   * @deprecated since 0.13-incubating. Use {@link #TransactionConflictException(long, String, String)} instead.
+   */
+  @Deprecated
   public TransactionConflictException(String message, Throwable cause) {
     super(message, cause);
+    transactionId = null;
+    conflictingKey = null;
+    conflictingClient = null;
+  }
+
+  public TransactionConflictException(long transactionId, String conflictingKey, String conflictingClient) {
+    super(String.format("Transaction %d conflicts with %s on change key '%s'", transactionId,
+                        conflictingClient == null ? "unknown client" : conflictingClient, conflictingKey));
+    this.transactionId = transactionId;
+    this.conflictingKey = conflictingKey;
+    this.conflictingClient = conflictingClient;
+  }
+
+  public Long getTransactionId() {
+    return transactionId;
+  }
+
+  public String getConflictingKey() {
+    return conflictingKey;
+  }
+
+  public String getConflictingClient() {
+    return conflictingClient;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
index 8b4e4fd..3c11e96 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -267,23 +267,16 @@ public class TransactionContext {
         // abort will throw that exception
       }
     }
-
-    boolean canCommit = false;
     try {
-      canCommit = txClient.canCommitOrThrow(currentTx, changes);
-    } catch (TransactionNotInProgressException | TransactionSizeException e) {
-      throw e;
-      // abort will throw that exception
+      txClient.canCommitOrThrow(currentTx, changes);
+    } catch (TransactionFailureException e) {
+      abort(e);
+      // abort will rethrow this exception
     } catch (Throwable e) {
       String message = String.format("Exception from canCommit for transaction %d.", currentTx.getTransactionId());
       abort(new TransactionFailureException(message, e));
       // abort will throw that exception
     }
-    if (!canCommit) {
-      String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
-      abort(new TransactionConflictException(message));
-      // abort will throw
-    }
   }
 
   private void persist() throws TransactionFailureException {
@@ -311,25 +304,16 @@ public class TransactionContext {
   }
 
   private void commit() throws TransactionFailureException {
-    boolean commitSuccess = false;
     try {
-      commitSuccess = txClient.commit(currentTx);
-    } catch (TransactionNotInProgressException e) {
-      String message = String.format("Transaction %d is not in progress.", currentTx.getTransactionId());
-      LOG.warn(message, e);
-      abort(new TransactionFailureException(message, e));
-      // abort will throw that exception
+      txClient.commitOrThrow(currentTx);
+    } catch (TransactionFailureException e) {
+      abort(e);
+      // abort will rethrow this exception
     } catch (Throwable e) {
       String message = String.format("Exception from commit for transaction %d.", currentTx.getTransactionId());
-      LOG.warn(message, e);
       abort(new TransactionFailureException(message, e));
       // abort will throw that exception
     }
-    if (!commitSuccess) {
-      String message = String.format("Conflict detected for transaction %d.", currentTx.getTransactionId());
-      abort(new TransactionConflictException(message));
-      // abort will throw
-    }
   }
 
   private void postCommit() throws TransactionFailureException {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index 4479812..68450c9 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -130,9 +130,9 @@ public class TransactionManager extends AbstractService {
   // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil)
   // todo: should this be consolidated with inProgress?
   // commit time next writePointer -> changes made by this tx
-  private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = new ConcurrentSkipListMap<>();
+  private final NavigableMap<Long, ChangeSet> committedChangeSets = new ConcurrentSkipListMap<>();
   // not committed yet
-  private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap();
+  private final Map<Long, ChangeSet> committingChangeSets = Maps.newConcurrentMap();
 
   private long readPointer;
   private long lastWritePointer;
@@ -157,6 +157,10 @@ public class TransactionManager extends AbstractService {
   private DaemonThreadExecutor snapshotThread;
   private DaemonThreadExecutor metricsThread;
 
+  // retention of client id for transactions - this affects memory footprint
+  private final boolean retainClientId;
+  private final boolean retainClientIdPastCommit;
+
   // lock guarding change of the current transaction log
   private final ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
   private final Lock logReadLock = logLock.readLock();
@@ -206,12 +210,19 @@ public class TransactionManager extends AbstractService {
     // TODO: REMOVE WITH txnBackwardsCompatCheck()
     longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 10000);
 
-    //
+    ClientIdRetention retention = ClientIdRetention.valueOf(
+      conf.get(TxConstants.Manager.CFG_TX_RETAIN_CLIENT_ID,
+               TxConstants.Manager.DEFAULT_TX_RETAIN_CLIENT_ID).toUpperCase());
+    this.retainClientId = retention != ClientIdRetention.OFF;
+    this.retainClientIdPastCommit = retention == ClientIdRetention.COMMITTED;
+
     this.txMetricsCollector = txMetricsCollector;
     this.txMetricsCollector.configure(conf);
     clear();
   }
 
+  enum ClientIdRetention { OFF, ACTIVE, COMMITTED }
+
   private void clear() {
     invalidTxList.clear();
     inProgress.clear();
@@ -400,7 +411,7 @@ public class TransactionManager extends AbstractService {
   }
 
   public synchronized TransactionSnapshot getSnapshot() throws IOException {
-    TransactionSnapshot snapshot = null;
+    TransactionSnapshot snapshot;
     if (!isRunning() && !isStopping()) {
       return null;
     }
@@ -436,8 +447,8 @@ public class TransactionManager extends AbstractService {
 
   private void doSnapshot(boolean closing) throws IOException {
     long snapshotTime = 0L;
-    TransactionSnapshot snapshot = null;
-    TransactionLog oldLog = null;
+    TransactionSnapshot snapshot;
+    TransactionLog oldLog;
     try {
       this.logWriteLock.lock();
       try {
@@ -521,8 +532,12 @@ public class TransactionManager extends AbstractService {
     lastWritePointer = snapshot.getWritePointer();
     invalidTxList.addAll(snapshot.getInvalid());
     inProgress.putAll(txnBackwardsCompatCheck(defaultLongTimeout, longTimeoutTolerance, snapshot.getInProgress()));
-    committingChangeSets.putAll(snapshot.getCommittingChangeSets());
-    committedChangeSets.putAll(snapshot.getCommittedChangeSets());
+    for (Map.Entry<Long, Set<ChangeId>> entry : snapshot.getCommittingChangeSets().entrySet()) {
+      committingChangeSets.put(entry.getKey(), new ChangeSet(null, entry.getValue()));
+    }
+    for (Map.Entry<Long, Set<ChangeId>> entry : snapshot.getCommittedChangeSets().entrySet()) {
+      committedChangeSets.put(entry.getKey(), new ChangeSet(null, entry.getValue()));
+    }
   }
 
   /**
@@ -593,7 +608,7 @@ public class TransactionManager extends AbstractService {
         if (reader == null) {
           continue;
         }
-        TransactionEdit edit = null;
+        TransactionEdit edit;
         while ((edit = reader.next()) != null) {
           editCnt++;
           switch (edit.getState()) {
@@ -613,7 +628,7 @@ public class TransactionManager extends AbstractService {
               addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), expiration, type, null);
               break;
             case COMMITTING:
-              addCommittingChangeSet(edit.getWritePointer(), edit.getChanges());
+              addCommittingChangeSet(edit.getWritePointer(), null, edit.getChanges());
               break;
             case COMMITTED:
               // TODO: need to reconcile usage of transaction id v/s write pointer TEPHRA-140
@@ -621,7 +636,7 @@ public class TransactionManager extends AbstractService {
               long[] checkpointPointers = edit.getCheckpointPointers();
               long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ?
                 transactionId : checkpointPointers[checkpointPointers.length - 1];
-              doCommit(transactionId, writePointer, edit.getChanges(),
+              doCommit(transactionId, writePointer, new ChangeSet(null, edit.getChanges()),
                        edit.getCommitPointer(), edit.getCanCommit());
               break;
             case INVALID:
@@ -670,9 +685,7 @@ public class TransactionManager extends AbstractService {
               throw new IllegalArgumentException("Invalid state for WAL entry: " + edit.getState());
           }
         }
-      } catch (IOException ioe) {
-        throw Throwables.propagate(ioe);
-      } catch (InvalidTruncateTimeException e) {
+      } catch (IOException | InvalidTruncateTimeException e) {
         throw Throwables.propagate(e);
       }
       LOG.info("Read " + editCnt + " edits from log " + log.getName());
@@ -815,7 +828,7 @@ public class TransactionManager extends AbstractService {
   }
 
   private Transaction startTx(long expiration, TransactionType type, @Nullable String clientId) {
-    Transaction tx = null;
+    Transaction tx;
     long txid;
     // guard against changes to the transaction log while processing
     this.logReadLock.lock();
@@ -824,7 +837,8 @@ public class TransactionManager extends AbstractService {
         ensureAvailable();
         txid = getNextWritePointer();
         tx = createTransaction(txid, type);
-        addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type, clientId);
+        addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration, type,
+                                retainClientId ? clientId : null);
       }
       // appending to WAL out of global lock for concurrent performance
       // we should still be able to arrive at the same state even if log entries are out of order
@@ -853,46 +867,42 @@ public class TransactionManager extends AbstractService {
     }
   }
 
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
-    throws TransactionNotInProgressException, TransactionSizeException {
+  public void canCommit(long txId, Collection<byte[]> changeIds)
+    throws TransactionNotInProgressException, TransactionSizeException, TransactionConflictException {
 
     txMetricsCollector.rate("canCommit");
     Stopwatch timer = new Stopwatch().start();
-    InProgressTx inProgressTx = inProgress.get(tx.getTransactionId());
+    InProgressTx inProgressTx = inProgress.get(txId);
     if (inProgressTx == null) {
       synchronized (this) {
         // invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
-        if (invalidTxList.contains(tx.getTransactionId())) {
+        if (invalidTxList.contains(txId)) {
           throw new TransactionNotInProgressException(
             String.format(
-              "canCommit() is called for transaction %d that is not in progress (it is known to be invalid)",
-              tx.getTransactionId()));
+              "canCommit() is called for transaction %d that is not in progress (it is known to be invalid)", txId));
         } else {
           throw new TransactionNotInProgressException(
-            String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
+            String.format("canCommit() is called for transaction %d that is not in progress", txId));
         }
       }
     }
 
     Set<ChangeId> set =
-      validateChangeSet(tx, changeIds, inProgressTx.clientId != null ? inProgressTx.clientId : DEFAULT_CLIENTID);
+      validateChangeSet(txId, changeIds, inProgressTx.clientId != null ? inProgressTx.clientId : DEFAULT_CLIENTID);
+    checkForConflicts(txId, set);
 
-    if (hasConflicts(tx, set)) {
-      return false;
-    }
     // guard against changes to the transaction log while processing
     this.logReadLock.lock();
     try {
       synchronized (this) {
         ensureAvailable();
-        addCommittingChangeSet(tx.getTransactionId(), set);
+        addCommittingChangeSet(txId, inProgressTx.getClientId(), set);
       }
-      appendToLog(TransactionEdit.createCommitting(tx.getTransactionId(), set));
+      appendToLog(TransactionEdit.createCommitting(txId, set));
     } finally {
       this.logReadLock.unlock();
     }
     txMetricsCollector.histogram("canCommit.latency", (int) timer.elapsedMillis());
-    return true;
   }
 
   /**
@@ -905,19 +915,19 @@ public class TransactionManager extends AbstractService {
    * @return the same set of changes, transformed into a set of {@link ChangeId}s.
    * @throws TransactionSizeException if the number or total size of the changes exceed the limit.
    */
-  private Set<ChangeId> validateChangeSet(Transaction tx, Collection<byte[]> changeIds,
+  private Set<ChangeId> validateChangeSet(long txId, Collection<byte[]> changeIds,
                                           String clientId) throws TransactionSizeException {
     if (changeIds.size() > changeSetCountLimit) {
       LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries and exceeds " +
                  "the allowed size of {}. Limit the number of changes, or use a long-running transaction. ",
-               tx.getTransactionId(), clientId, changeIds.size(), changeSetCountLimit);
+               txId, clientId, changeIds.size(), changeSetCountLimit);
       throw new TransactionSizeException(String.format(
         "Change set for transaction %d has %d entries and exceeds the limit of %d",
-        tx.getTransactionId(), changeIds.size(), changeSetCountLimit));
+        txId, changeIds.size(), changeSetCountLimit));
     } else if (changeIds.size() > changeSetCountThreshold) {
       LOG.warn("Change set for transaction {} belonging to client '{}' has {} entries. " +
                  "It is recommended to limit the number of changes to {}, or to use a long-running transaction. ",
-               tx.getTransactionId(), clientId, changeIds.size(), changeSetCountThreshold);
+               txId, clientId, changeIds.size(), changeSetCountThreshold);
     }
     long byteCount = 0L;
     Set<ChangeId> set = Sets.newHashSetWithExpectedSize(changeIds.size());
@@ -928,26 +938,28 @@ public class TransactionManager extends AbstractService {
     if (byteCount > changeSetSizeLimit) {
       LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes and exceeds " +
                  "the allowed size of {} bytes. Limit the total size of changes, or use a long-running transaction. ",
-               tx.getTransactionId(), clientId, byteCount, changeSetSizeLimit);
+               txId, clientId, byteCount, changeSetSizeLimit);
       throw new TransactionSizeException(String.format(
         "Change set for transaction %d has total size of %d bytes and exceeds the limit of %d bytes",
-        tx.getTransactionId(), byteCount, changeSetSizeLimit));
+        txId, byteCount, changeSetSizeLimit));
     } else if (byteCount > changeSetSizeThreshold) {
       LOG.warn("Change set for transaction {} belonging to client '{}' has total size of {} bytes. " +
                  "It is recommended to limit the total size to {} bytes, or to use a long-running transaction. ",
-               tx.getTransactionId(), clientId, byteCount, changeSetSizeThreshold);
+               txId, clientId, byteCount, changeSetSizeThreshold);
     }
     return set;
   }
 
-  private void addCommittingChangeSet(long writePointer, Set<ChangeId> changes) {
-    committingChangeSets.put(writePointer, changes);
+  private void addCommittingChangeSet(long writePointer, String clientId, Set<ChangeId> changes) {
+    committingChangeSets.put(writePointer, new ChangeSet(retainClientIdPastCommit ? clientId : null, changes));
   }
 
-  public boolean commit(Transaction tx) throws TransactionNotInProgressException {
+  public void commit(long txId, long writePointer)
+    throws TransactionNotInProgressException, TransactionConflictException {
+
     txMetricsCollector.rate("commit");
     Stopwatch timer = new Stopwatch().start();
-    Set<ChangeId> changeSet = null;
+    ChangeSet changeSet;
     boolean addToCommitted = true;
     long commitPointer;
     // guard against changes to the transaction log while processing
@@ -958,57 +970,55 @@ public class TransactionManager extends AbstractService {
         // we record commits at the first not-yet assigned transaction id to simplify clearing out change sets that
         // are no longer visible by any in-progress transactions
         commitPointer = lastWritePointer + 1;
-        if (inProgress.get(tx.getTransactionId()) == null) {
+        if (inProgress.get(txId) == null) {
           // invalid transaction, either this has timed out and moved to invalid, or something else is wrong.
-          if (invalidTxList.contains(tx.getTransactionId())) {
+          if (invalidTxList.contains(txId)) {
             throw new TransactionNotInProgressException(
               String.format("canCommit() is called for transaction %d that is not in progress " +
-                              "(it is known to be invalid)", tx.getTransactionId()));
+                              "(it is known to be invalid)", txId));
           } else {
             throw new TransactionNotInProgressException(
-              String.format("canCommit() is called for transaction %d that is not in progress", tx.getTransactionId()));
+              String.format("canCommit() is called for transaction %d that is not in progress", txId));
           }
         }
 
         // these should be atomic
         // NOTE: whether we succeed or not we don't need to keep changes in committing state: same tx cannot
         //       be attempted to commit twice
-        changeSet = committingChangeSets.remove(tx.getTransactionId());
+        changeSet = committingChangeSets.remove(txId);
 
         if (changeSet != null) {
           // double-checking if there are conflicts: someone may have committed since canCommit check
-          if (hasConflicts(tx, changeSet)) {
-            return false;
-          }
+          checkForConflicts(txId, changeSet.getChangeIds());
         } else {
           // no changes
           addToCommitted = false;
         }
-        doCommit(tx.getTransactionId(), tx.getWritePointer(), changeSet, commitPointer, addToCommitted);
+        doCommit(txId, writePointer, changeSet, commitPointer, addToCommitted);
       }
-      appendToLog(TransactionEdit.createCommitted(tx.getTransactionId(), changeSet, commitPointer, addToCommitted));
+      appendToLog(TransactionEdit.createCommitted(txId, changeSet == null ? null : changeSet.getChangeIds(),
+                                                  commitPointer, addToCommitted));
     } finally {
       this.logReadLock.unlock();
     }
     txMetricsCollector.histogram("commit.latency", (int) timer.elapsedMillis());
-    return true;
   }
 
-  private void doCommit(long transactionId, long writePointer, Set<ChangeId> changes, long commitPointer,
+  private void doCommit(long transactionId, long writePointer, ChangeSet changes, long commitPointer,
                         boolean addToCommitted) {
     // In case this method is called when loading a previous WAL, we need to remove the tx from these sets
     committingChangeSets.remove(transactionId);
-    if (addToCommitted && !changes.isEmpty()) {
+    if (addToCommitted && !changes.getChangeIds().isEmpty()) {
       // No need to add empty changes to the committed change sets, they will never trigger any conflict
 
       // Record the committed change set with the next writePointer as the commit time.
       // NOTE: we use current next writePointer as key for the map, hence we may have multiple txs changesets to be
       //       stored under one key
-      Set<ChangeId> changeIds = committedChangeSets.get(commitPointer);
-      if (changeIds != null) {
+      ChangeSet committed = committedChangeSets.get(commitPointer);
+      if (committed != null) {
         // NOTE: we modify the new set to prevent concurrent modification exception, as other threads (e.g. in
         // canCommit) use it unguarded
-        changes.addAll(changeIds);
+        changes.getChangeIds().addAll(committed.getChangeIds());
       }
       committedChangeSets.put(commitPointer, changes);
     }
@@ -1112,7 +1122,7 @@ public class TransactionManager extends AbstractService {
   }
 
   private boolean doInvalidate(long writePointer) {
-    Set<ChangeId> previousChangeSet = committingChangeSets.remove(writePointer);
+    ChangeSet previousChangeSet = committingChangeSets.remove(writePointer);
     // remove from in-progress set, so that it does not get excluded in the future
     InProgressTx previous = inProgress.remove(writePointer);
 
@@ -1224,9 +1234,9 @@ public class TransactionManager extends AbstractService {
     txMetricsCollector.rate("checkpoint");
     Stopwatch timer = new Stopwatch().start();
 
-    Transaction checkpointedTx = null;
+    Transaction checkpointedTx;
     long txId = originalTx.getTransactionId();
-    long newWritePointer = 0;
+    long newWritePointer;
     // guard against changes to the transaction log while processing
     this.logReadLock.lock();
     try {
@@ -1284,39 +1294,49 @@ public class TransactionManager extends AbstractService {
     return this.committedChangeSets.size();
   }
 
-  private boolean hasConflicts(Transaction tx, Set<ChangeId> changeIds) {
+  @Nullable
+  @VisibleForTesting
+  InProgressTx getInProgress(long transactionId) {
+    return inProgress.get(transactionId);
+  }
+
+  private void checkForConflicts(long txId, Set<ChangeId> changeIds) throws TransactionConflictException {
     if (changeIds.isEmpty()) {
-      return false;
+      return;
     }
 
-    for (Map.Entry<Long, Set<ChangeId>> changeSet : committedChangeSets.entrySet()) {
+    for (Map.Entry<Long, ChangeSet> committed : committedChangeSets.entrySet()) {
       // If commit time is greater than tx read-pointer,
       // basically not visible but committed means "tx committed after given tx was started"
-      if (changeSet.getKey() > tx.getTransactionId()) {
-        if (overlap(changeSet.getValue(), changeIds)) {
-          return true;
+      if (committed.getKey() > txId) {
+        ChangeId change = overlap(committed.getValue().getChangeIds(), changeIds);
+        if (change != null) {
+          throw new TransactionConflictException(txId, change.toString(), committed.getValue().getClientId());
         }
       }
     }
-    return false;
   }
 
-  private boolean overlap(Set<ChangeId> a, Set<ChangeId> b) {
+  /**
+   * Checks for overlap in two change sets, returns the first common change it finds, or null if no overlap.
+   */
+  @Nullable
+  private ChangeId overlap(Set<ChangeId> a, Set<ChangeId> b) {
     // iterate over the smaller set, and check for every element in the other set
     if (a.size() > b.size()) {
       for (ChangeId change : b) {
         if (a.contains(change)) {
-          return true;
+          return change;
         }
       }
     } else {
       for (ChangeId change : a) {
         if (b.contains(change)) {
-          return true;
+          return change;
         }
       }
     }
-    return false;
+    return null;
   }
 
   private void moveReadPointerIfNeeded(long committedWritePointer) {
@@ -1385,9 +1405,9 @@ public class TransactionManager extends AbstractService {
   }
 
   private abstract static class DaemonThreadExecutor extends Thread {
-    private AtomicBoolean stopped = new AtomicBoolean(false);
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
 
-    public DaemonThreadExecutor(String name) {
+    DaemonThreadExecutor(String name) {
       super(name);
       setDaemon(true);
     }
@@ -1577,4 +1597,25 @@ public class TransactionManager extends AbstractService {
     }
   }
 
+  /**
+   * Represents a set of changes from a client.
+   */
+  public static class ChangeSet {
+    final String clientId;
+    final Set<ChangeId> changeIds;
+
+    ChangeSet(@Nullable String clientId, Set<ChangeId> changeIds) {
+      this.clientId = clientId;
+      this.changeIds = changeIds;
+    }
+
+    @Nullable
+    public String getClientId() {
+      return clientId;
+    }
+
+    public Set<ChangeId> getChangeIds() {
+      return changeIds;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
index a44f131..29fbe62 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
@@ -60,7 +60,7 @@ public interface TransactionSystemClient {
   /**
    * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and
    * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call
-   * and {@link #commit(Transaction)} which may check conflicts again to avoid races.
+   * and {@link #commitOrThrow(Transaction)} which will check conflicts again to avoid races.
    * <p/>
    * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts
    * before we persist changes to avoid rollback in case of conflicts as much as possible.
@@ -80,7 +80,7 @@ public interface TransactionSystemClient {
   /**
    * Checks if transaction with the set of changes can be committed. E.g. it can check conflicts with other changes and
    * refuse commit if there are conflicts. It is assumed that no other changes will be done in between this method call
-   * and {@link #commit(Transaction)} which may check conflicts again to avoid races.
+   * and {@link #commitOrThrow(Transaction)} which will check conflicts again to avoid races.
    * <p/>
    * Since we do conflict detection at commit time as well, this may seem redundant. The idea is to check for conflicts
    * before we persist changes to avoid rollback in case of conflicts as much as possible.
@@ -89,21 +89,38 @@ public interface TransactionSystemClient {
    *
    * @param tx transaction to verify
    * @param changeIds ids of changes made by transaction
-   * @return true if transaction can be committed otherwise false
-   * @throws TransactionSizeException if the size of the chgange set exceeds the allowed limit
-   * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out.
+   *
+   * @throws TransactionSizeException if the size of the change set exceeds the allowed limit
+   * @throws TransactionConflictException if the change set has a conflict with an overlapping transaction
+   * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out
    */
-  boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) throws TransactionFailureException;
+  void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+    throws TransactionFailureException;
 
   /**
    * Makes transaction visible. It will again check conflicts of changes submitted previously with
-   * {@link #canCommit(Transaction, java.util.Collection)}
+   * {@link #canCommitOrThrow(Transaction, java.util.Collection)}
+   *
    * @param tx transaction to make visible.
    * @return true if transaction can be committed otherwise false
+   *
+   * @deprecated as of 0.13-incubating. Use {@link #canCommitOrThrow(Transaction, Collection)} instead.
    */
+  @Deprecated
   boolean commit(Transaction tx) throws TransactionNotInProgressException;
 
   /**
+   * Makes transaction visible. It will again check conflicts of changes submitted previously with
+   * {@link #canCommitOrThrow(Transaction, java.util.Collection)}
+   *
+   * @param tx transaction to make visible.
+   *
+   * @throws TransactionConflictException if the transaction has a conflict with an overlapping transaction
+   * @throws TransactionNotInProgressException if the transaction is not in progress; most likely it has timed out
+   */
+  void commitOrThrow(Transaction tx) throws TransactionFailureException;
+
+  /**
    * Makes transaction visible. You should call it only when all changes of this tx are undone.
    * NOTE: it will not throw {@link TransactionNotInProgressException} if transaction has timed out.
    * @param tx transaction to make visible.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 5c78aa4..3a6b70a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -196,6 +196,17 @@ public class TxConstants {
     public static final long DEFAULT_TX_CHANGESET_SIZE_LIMIT = Long.MAX_VALUE;
     /** The default warning threshold for the total size in bytes of a change set is unlimited. */
     public static final long DEFAULT_TX_CHANGESET_SIZE_WARN_THRESHOLD = Long.MAX_VALUE;
+
+    /** Whether and how long to retain the client id of a transaction. Valid values are:
+     * <ul>
+     *   <li>OFF - do not retain the client id at all</li>
+     *   <li>ACTIVE - retain the client id until a transaction commits, aborts, or is invalidated</li>
+     *   <li>COMMITTED - retain the client id after it commits, as long as it participates in conflict detection</li>
+     * </ul>
+     */
+    public static final String CFG_TX_RETAIN_CLIENT_ID = "data.tx.retain.client.id";
+    /** Default for how long to retain a transaction's client id */
+    public static final String DEFAULT_TX_RETAIN_CLIENT_ID = "COMMITTED";
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
index f1743de..938eef5 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
@@ -27,6 +27,7 @@ import com.google.inject.name.Named;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
 import org.apache.tephra.TransactionFailureException;
 import org.apache.tephra.TransactionNotInProgressException;
@@ -75,7 +76,6 @@ public class TransactionServiceClient implements TransactionSystemClient {
   /**
    * Utility to be used for basic verification of transaction system availability and functioning
    * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx
-   * @throws Exception
    */
   public static void main(String[] args) throws Exception {
     if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) {
@@ -115,19 +115,14 @@ public class TransactionServiceClient implements TransactionSystemClient {
                    ", invalids: " + tx.getInvalids().length +
                    ", inProgress: " + tx.getInProgress().length);
       }
-      LOG.info("Checking if canCommit tx...");
-      boolean canCommit = client.canCommitOrThrow(tx, Collections.<byte[]>emptyList());
-      LOG.info("canCommit: " + canCommit);
-      if (canCommit) {
+      try {
+        LOG.info("Checking if canCommit tx...");
+        client.canCommitOrThrow(tx, Collections.<byte[]>emptyList());
+        LOG.info("canCommit: success");
         LOG.info("Committing tx...");
-        boolean committed = client.commit(tx);
-        LOG.info("Committed tx: " + committed);
-        if (!committed) {
-          LOG.info("Aborting tx...");
-          client.abort(tx);
-          LOG.info("Aborted tx...");
-        }
-      } else {
+        client.commitOrThrow(tx);
+        LOG.info("Committed tx: success");
+      } catch (TransactionConflictException e) {
         LOG.info("Aborting tx...");
         client.abort(tx);
         LOG.info("Aborted tx...");
@@ -322,25 +317,16 @@ public class TransactionServiceClient implements TransactionSystemClient {
   @Override
   public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds)
     throws TransactionNotInProgressException {
-
     try {
-      return execute(
-        new Operation<Boolean>("canCommit") {
-          @Override
-          public Boolean execute(TransactionServiceThriftClient client)
-            throws Exception {
-            return client.canCommit(tx, changeIds);
-          }
-        });
-    } catch (TransactionNotInProgressException e) {
-      throw e;
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
+      canCommitOrThrow(tx, changeIds);
+      return true;
+    } catch (TransactionFailureException e) {
+      return false;
     }
   }
 
   @Override
-  public boolean canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds)
+  public void canCommitOrThrow(final Transaction tx, final Collection<byte[]> changeIds)
     throws TransactionFailureException {
 
     // we want to validate the size of the change set here before sending it over the wire.
@@ -369,15 +355,16 @@ public class TransactionServiceClient implements TransactionSystemClient {
     }
 
     try {
-      return execute(
-        new Operation<Boolean>("canCommit") {
+      execute(
+        new Operation<Void>("canCommit") {
           @Override
-          public Boolean execute(TransactionServiceThriftClient client)
+          public Void execute(TransactionServiceThriftClient client)
             throws Exception {
-            return client.canCommitOrThrow(tx, changeIds);
+            client.canCommit(tx, changeIds);
+            return null;
           }
         });
-    } catch (TransactionNotInProgressException | TransactionSizeException e) {
+    } catch (TransactionNotInProgressException | TransactionSizeException | TransactionConflictException e) {
       throw e;
     } catch (Exception e) {
       throw Throwables.propagate(e);
@@ -387,15 +374,26 @@ public class TransactionServiceClient implements TransactionSystemClient {
   @Override
   public boolean commit(final Transaction tx) throws TransactionNotInProgressException {
     try {
-      return this.execute(
-        new Operation<Boolean>("commit") {
+      commitOrThrow(tx);
+      return true;
+    } catch (TransactionFailureException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public void commitOrThrow(final Transaction tx)
+    throws TransactionFailureException {
+    try {
+      execute(
+        new Operation<Void>("commit") {
           @Override
-          public Boolean execute(TransactionServiceThriftClient client)
-            throws Exception {
-            return client.commit(tx);
+          public Void execute(TransactionServiceThriftClient client) throws Exception {
+            client.commit(tx.getTransactionId(), tx.getWritePointer());
+            return null;
           }
         });
-    } catch (TransactionNotInProgressException e) {
+    } catch (TransactionNotInProgressException | TransactionConflictException e) {
       throw e;
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
index ba37243..6ce7b84 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
@@ -23,11 +23,13 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionCouldNotTakeSnapshotException;
 import org.apache.tephra.TransactionNotInProgressException;
 import org.apache.tephra.TransactionSizeException;
 import org.apache.tephra.distributed.thrift.TGenericException;
 import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException;
+import org.apache.tephra.distributed.thrift.TTransactionConflictException;
 import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
 import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException;
 import org.apache.tephra.distributed.thrift.TTransactionServer;
@@ -64,12 +66,12 @@ public class TransactionServiceThriftClient {
   /**
    * The thrift transport layer. We need this when we close the connection.
    */
-  TTransport transport;
+  private TTransport transport;
 
   /**
    * The actual thrift client.
    */
-  TTransactionServer.Client client;
+  private TTransactionServer.Client client;
 
   /**
    * Whether this client is valid for use.
@@ -184,30 +186,19 @@ public class TransactionServiceThriftClient {
     }
   }
 
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
-    throws TException, TransactionNotInProgressException {
+  public void canCommit(Transaction tx, Collection<byte[]> changeIds)
+    throws TException, TransactionNotInProgressException, TransactionSizeException, TransactionConflictException {
     try {
-      return client.canCommitTx(TransactionConverterUtils.wrap(tx),
-                                ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
-    } catch (TTransactionNotInProgressException e) {
-      throw new TransactionNotInProgressException(e.getMessage());
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
-    throws TException, TransactionNotInProgressException, TransactionSizeException {
-    try {
-      return client.canCommitTx(TransactionConverterUtils.wrap(tx),
-                                ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
+      client.canCommitOrThrow(tx.getTransactionId(),
+                              ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER)));
     } catch (TTransactionNotInProgressException e) {
       throw new TransactionNotInProgressException(e.getMessage());
+    } catch (TTransactionConflictException e) {
+      throw new TransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient());
     } catch (TGenericException e) {
       // currently, we only expect TransactionSizeException here
       if (!TransactionSizeException.class.getName().equals(e.getOriginalExceptionClass())) {
-        LOG.trace("Expecting only {} as the original exception class but found {}",
+        LOG.debug("Expecting only {} as the original exception class but found {}",
                   TransactionSizeException.class.getName(), e.getOriginalExceptionClass());
         throw e;
       }
@@ -218,11 +209,18 @@ public class TransactionServiceThriftClient {
     }
   }
 
-  public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException {
+  public void commit(long txId, long wp)
+    throws TException, TransactionNotInProgressException, TransactionConflictException {
     try {
-      return client.commitTx(TransactionConverterUtils.wrap(tx)).isValue();
+      client.commitOrThrow(txId, wp);
     } catch (TTransactionNotInProgressException e) {
       throw new TransactionNotInProgressException(e.getMessage());
+    } catch (TTransactionConflictException e) {
+      throw new TransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient());
+    } catch (TGenericException e) {
+      // we never throw this from commitOrThrow() - it was added as place holder to avoid future thrift API changes
+      LOG.debug("Unexpected {} from commitOrThrow()", TGenericException.class.getName());
+      throw e;
     } catch (TException e) {
       isValid.set(false);
       throw e;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
index 0c9105b..95988c0 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
@@ -20,6 +20,7 @@ package org.apache.tephra.distributed;
 
 import com.google.common.collect.Sets;
 import org.apache.tephra.InvalidTruncateTimeException;
+import org.apache.tephra.TransactionConflictException;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionNotInProgressException;
 import org.apache.tephra.TransactionSizeException;
@@ -28,6 +29,7 @@ import org.apache.tephra.distributed.thrift.TBoolean;
 import org.apache.tephra.distributed.thrift.TGenericException;
 import org.apache.tephra.distributed.thrift.TInvalidTruncateTimeException;
 import org.apache.tephra.distributed.thrift.TTransaction;
+import org.apache.tephra.distributed.thrift.TTransactionConflictException;
 import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
 import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException;
 import org.apache.tephra.distributed.thrift.TTransactionServer;
@@ -119,25 +121,16 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
 
   @Override
   public TBoolean canCommitTx(TTransaction tx, Set<ByteBuffer> changes) throws TException {
-
-    Set<byte[]> changeIds = Sets.newHashSet();
-    for (ByteBuffer bb : changes) {
-      byte[] changeId = new byte[bb.remaining()];
-      bb.get(changeId);
-      changeIds.add(changeId);
-    }
     try {
-      return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds));
-    } catch (TransactionNotInProgressException e) {
-      throw new TTransactionNotInProgressException(e.getMessage());
-    } catch (TransactionSizeException e) {
-      return new TBoolean(false); // can't throw exception -> just indicate that it failed
+      canCommitOrThrow(tx.getTransactionId(), changes);
+      return new TBoolean(true);
+    } catch (TTransactionConflictException | TGenericException e) {
+      return new TBoolean(false);
     }
   }
 
   @Override
-  public TBoolean canCommitOrThrow(TTransaction tx, Set<ByteBuffer> changes) throws TException {
-
+  public void canCommitOrThrow(long txId, Set<ByteBuffer> changes) throws TException {
     Set<byte[]> changeIds = Sets.newHashSet();
     for (ByteBuffer bb : changes) {
       byte[] changeId = new byte[bb.remaining()];
@@ -145,20 +138,34 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
       changeIds.add(changeId);
     }
     try {
-      return new TBoolean(txManager.canCommit(TransactionConverterUtils.unwrap(tx), changeIds));
+      txManager.canCommit(txId, changeIds);
     } catch (TransactionNotInProgressException e) {
       throw new TTransactionNotInProgressException(e.getMessage());
     } catch (TransactionSizeException e) {
       throw new TGenericException(e.getMessage(), TransactionSizeException.class.getName());
+    } catch (TransactionConflictException e) {
+      throw new TTransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient());
     }
   }
 
   @Override
   public TBoolean commitTx(TTransaction tx) throws TException {
     try {
-      return new TBoolean(txManager.commit(TransactionConverterUtils.unwrap(tx)));
+      commitOrThrow(tx.getTransactionId(), tx.getWritePointer());
+      return new TBoolean(true);
+    } catch (TTransactionConflictException | TGenericException e) {
+      return new TBoolean(false);
+    }
+  }
+
+  @Override
+  public void commitOrThrow(long txId, long wp) throws TException {
+    try {
+      txManager.commit(txId, wp);
     } catch (TransactionNotInProgressException e) {
       throw new TTransactionNotInProgressException(e.getMessage());
+    } catch (TransactionConflictException e) {
+      throw new TTransactionConflictException(e.getTransactionId(), e.getConflictingKey(), e.getConflictingClient());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/174c3325/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java
new file mode 100644
index 0000000..d7c6c9f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionConflictException.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.tephra.distributed.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TTransactionConflictException extends TException implements org.apache.thrift.TBase<TTransactionConflictException, TTransactionConflictException._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TTransactionConflictException");
+
+  private static final org.apache.thrift.protocol.TField TRANSACTION_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("transactionId", org.apache.thrift.protocol.TType.I64, (short)1);
+  private static final org.apache.thrift.protocol.TField CONFLICTING_KEY_FIELD_DESC = new org.apache.thrift.protocol.TField("conflictingKey", org.apache.thrift.protocol.TType.STRING, (short)2);
+  private static final org.apache.thrift.protocol.TField CONFLICTING_CLIENT_FIELD_DESC = new org.apache.thrift.protocol.TField("conflictingClient", org.apache.thrift.protocol.TType.STRING, (short)3);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new TTransactionConflictExceptionStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new TTransactionConflictExceptionTupleSchemeFactory());
+  }
+
+  public long transactionId; // required
+  public String conflictingKey; // required
+  public String conflictingClient; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    TRANSACTION_ID((short)1, "transactionId"),
+    CONFLICTING_KEY((short)2, "conflictingKey"),
+    CONFLICTING_CLIENT((short)3, "conflictingClient");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // TRANSACTION_ID
+          return TRANSACTION_ID;
+        case 2: // CONFLICTING_KEY
+          return CONFLICTING_KEY;
+        case 3: // CONFLICTING_CLIENT
+          return CONFLICTING_CLIENT;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __TRANSACTIONID_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.TRANSACTION_ID, new org.apache.thrift.meta_data.FieldMetaData("transactionId", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    tmpMap.put(_Fields.CONFLICTING_KEY, new org.apache.thrift.meta_data.FieldMetaData("conflictingKey", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.CONFLICTING_CLIENT, new org.apache.thrift.meta_data.FieldMetaData("conflictingClient", org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TTransactionConflictException.class, metaDataMap);
+  }
+
+  public TTransactionConflictException() {
+  }
+
+  public TTransactionConflictException(
+    long transactionId,
+    String conflictingKey,
+    String conflictingClient)
+  {
+    this();
+    this.transactionId = transactionId;
+    setTransactionIdIsSet(true);
+    this.conflictingKey = conflictingKey;
+    this.conflictingClient = conflictingClient;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public TTransactionConflictException(TTransactionConflictException other) {
+    __isset_bitfield = other.__isset_bitfield;
+    this.transactionId = other.transactionId;
+    if (other.isSetConflictingKey()) {
+      this.conflictingKey = other.conflictingKey;
+    }
+    if (other.isSetConflictingClient()) {
+      this.conflictingClient = other.conflictingClient;
+    }
+  }
+
+  public TTransactionConflictException deepCopy() {
+    return new TTransactionConflictException(this);
+  }
+
+  @Override
+  public void clear() {
+    setTransactionIdIsSet(false);
+    this.transactionId = 0;
+    this.conflictingKey = null;
+    this.conflictingClient = null;
+  }
+
+  public long getTransactionId() {
+    return this.transactionId;
+  }
+
+  public TTransactionConflictException setTransactionId(long transactionId) {
+    this.transactionId = transactionId;
+    setTransactionIdIsSet(true);
+    return this;
+  }
+
+  public void unsetTransactionId() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID);
+  }
+
+  /** Returns true if field transactionId is set (has been assigned a value) and false otherwise */
+  public boolean isSetTransactionId() {
+    return EncodingUtils.testBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID);
+  }
+
+  public void setTransactionIdIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TRANSACTIONID_ISSET_ID, value);
+  }
+
+  public String getConflictingKey() {
+    return this.conflictingKey;
+  }
+
+  public TTransactionConflictException setConflictingKey(String conflictingKey) {
+    this.conflictingKey = conflictingKey;
+    return this;
+  }
+
+  public void unsetConflictingKey() {
+    this.conflictingKey = null;
+  }
+
+  /** Returns true if field conflictingKey is set (has been assigned a value) and false otherwise */
+  public boolean isSetConflictingKey() {
+    return this.conflictingKey != null;
+  }
+
+  public void setConflictingKeyIsSet(boolean value) {
+    if (!value) {
+      this.conflictingKey = null;
+    }
+  }
+
+  public String getConflictingClient() {
+    return this.conflictingClient;
+  }
+
+  public TTransactionConflictException setConflictingClient(String conflictingClient) {
+    this.conflictingClient = conflictingClient;
+    return this;
+  }
+
+  public void unsetConflictingClient() {
+    this.conflictingClient = null;
+  }
+
+  /** Returns true if field conflictingClient is set (has been assigned a value) and false otherwise */
+  public boolean isSetConflictingClient() {
+    return this.conflictingClient != null;
+  }
+
+  public void setConflictingClientIsSet(boolean value) {
+    if (!value) {
+      this.conflictingClient = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case TRANSACTION_ID:
+      if (value == null) {
+        unsetTransactionId();
+      } else {
+        setTransactionId((Long)value);
+      }
+      break;
+
+    case CONFLICTING_KEY:
+      if (value == null) {
+        unsetConflictingKey();
+      } else {
+        setConflictingKey((String)value);
+      }
+      break;
+
+    case CONFLICTING_CLIENT:
+      if (value == null) {
+        unsetConflictingClient();
+      } else {
+        setConflictingClient((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case TRANSACTION_ID:
+      return Long.valueOf(getTransactionId());
+
+    case CONFLICTING_KEY:
+      return getConflictingKey();
+
+    case CONFLICTING_CLIENT:
+      return getConflictingClient();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case TRANSACTION_ID:
+      return isSetTransactionId();
+    case CONFLICTING_KEY:
+      return isSetConflictingKey();
+    case CONFLICTING_CLIENT:
+      return isSetConflictingClient();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof TTransactionConflictException)
+      return this.equals((TTransactionConflictException)that);
+    return false;
+  }
+
+  public boolean equals(TTransactionConflictException that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_transactionId = true;
+    boolean that_present_transactionId = true;
+    if (this_present_transactionId || that_present_transactionId) {
+      if (!(this_present_transactionId && that_present_transactionId))
+        return false;
+      if (this.transactionId != that.transactionId)
+        return false;
+    }
+
+    boolean this_present_conflictingKey = true && this.isSetConflictingKey();
+    boolean that_present_conflictingKey = true && that.isSetConflictingKey();
+    if (this_present_conflictingKey || that_present_conflictingKey) {
+      if (!(this_present_conflictingKey && that_present_conflictingKey))
+        return false;
+      if (!this.conflictingKey.equals(that.conflictingKey))
+        return false;
+    }
+
+    boolean this_present_conflictingClient = true && this.isSetConflictingClient();
+    boolean that_present_conflictingClient = true && that.isSetConflictingClient();
+    if (this_present_conflictingClient || that_present_conflictingClient) {
+      if (!(this_present_conflictingClient && that_present_conflictingClient))
+        return false;
+      if (!this.conflictingClient.equals(that.conflictingClient))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  public int compareTo(TTransactionConflictException other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    TTransactionConflictException typedOther = (TTransactionConflictException)other;
+
+    lastComparison = Boolean.valueOf(isSetTransactionId()).compareTo(typedOther.isSetTransactionId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTransactionId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.transactionId, typedOther.transactionId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetConflictingKey()).compareTo(typedOther.isSetConflictingKey());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetConflictingKey()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.conflictingKey, typedOther.conflictingKey);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetConflictingClient()).compareTo(typedOther.isSetConflictingClient());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetConflictingClient()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.conflictingClient, typedOther.conflictingClient);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("TTransactionConflictException(");
+    boolean first = true;
+
+    sb.append("transactionId:");
+    sb.append(this.transactionId);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("conflictingKey:");
+    if (this.conflictingKey == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.conflictingKey);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("conflictingClient:");
+    if (this.conflictingClient == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.conflictingClient);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class TTransactionConflictExceptionStandardSchemeFactory implements SchemeFactory {
+    public TTransactionConflictExceptionStandardScheme getScheme() {
+      return new TTransactionConflictExceptionStandardScheme();
+    }
+  }
+
+  private static class TTransactionConflictExceptionStandardScheme extends StandardScheme<TTransactionConflictException> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, TTransactionConflictException struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // TRANSACTION_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.transactionId = iprot.readI64();
+              struct.setTransactionIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // CONFLICTING_KEY
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.conflictingKey = iprot.readString();
+              struct.setConflictingKeyIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // CONFLICTING_CLIENT
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.conflictingClient = iprot.readString();
+              struct.setConflictingClientIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, TTransactionConflictException struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      oprot.writeFieldBegin(TRANSACTION_ID_FIELD_DESC);
+      oprot.writeI64(struct.transactionId);
+      oprot.writeFieldEnd();
+      if (struct.conflictingKey != null) {
+        oprot.writeFieldBegin(CONFLICTING_KEY_FIELD_DESC);
+        oprot.writeString(struct.conflictingKey);
+        oprot.writeFieldEnd();
+      }
+      if (struct.conflictingClient != null) {
+        oprot.writeFieldBegin(CONFLICTING_CLIENT_FIELD_DESC);
+        oprot.writeString(struct.conflictingClient);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class TTransactionConflictExceptionTupleSchemeFactory implements SchemeFactory {
+    public TTransactionConflictExceptionTupleScheme getScheme() {
+      return new TTransactionConflictExceptionTupleScheme();
+    }
+  }
+
+  private static class TTransactionConflictExceptionTupleScheme extends TupleScheme<TTransactionConflictException> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, TTransactionConflictException struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetTransactionId()) {
+        optionals.set(0);
+      }
+      if (struct.isSetConflictingKey()) {
+        optionals.set(1);
+      }
+      if (struct.isSetConflictingClient()) {
+        optionals.set(2);
+      }
+      oprot.writeBitSet(optionals, 3);
+      if (struct.isSetTransactionId()) {
+        oprot.writeI64(struct.transactionId);
+      }
+      if (struct.isSetConflictingKey()) {
+        oprot.writeString(struct.conflictingKey);
+      }
+      if (struct.isSetConflictingClient()) {
+        oprot.writeString(struct.conflictingClient);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, TTransactionConflictException struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(3);
+      if (incoming.get(0)) {
+        struct.transactionId = iprot.readI64();
+        struct.setTransactionIdIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.conflictingKey = iprot.readString();
+        struct.setConflictingKeyIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.conflictingClient = iprot.readString();
+        struct.setConflictingClientIsSet(true);
+      }
+    }
+  }
+
+}
+