You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/09 05:54:04 UTC

incubator-tephra git commit: (TEPHRA-250) Allow to trigger transaction pruning

Repository: incubator-tephra
Updated Branches:
  refs/heads/master 39f4fde63 -> 8532076f8


(TEPHRA-250) Allow to trigger transaction pruning

This closes #52

Signed-off-by: anew <an...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/8532076f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/8532076f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/8532076f

Branch: refs/heads/master
Commit: 8532076f802508a4a761408a7501c82463dd3ad5
Parents: 39f4fde
Author: anew <an...@apache.org>
Authored: Fri Sep 8 16:36:29 2017 -0700
Committer: anew <an...@apache.org>
Committed: Fri Sep 8 22:52:36 2017 -0700

----------------------------------------------------------------------
 .../apache/tephra/TransactionSystemClient.java  |   5 +
 .../tephra/distributed/TransactionService.java  |  28 +-
 .../distributed/TransactionServiceClient.java   |  18 +
 .../TransactionServiceThriftClient.java         |   9 +
 .../TransactionServiceThriftHandler.java        |  12 +-
 .../distributed/thrift/TTransactionServer.java  | 565 +++++++++++++++++++
 .../tephra/inmemory/DetachedTxSystemClient.java |   5 +
 .../tephra/inmemory/InMemoryTxSystemClient.java |   6 +
 .../tephra/inmemory/MinimalTxSystemClient.java  |   5 +
 .../txprune/TransactionPruningService.java      |  19 +-
 tephra-core/src/main/thrift/transaction.thrift  |   1 +
 .../tephra/ThriftTransactionSystemTest.java     |  35 ++
 .../txprune/TransactionPruningServiceTest.java  |  35 +-
 13 files changed, 730 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
index fe4b63e..9702c61 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
@@ -140,4 +140,9 @@ public interface TransactionSystemClient {
    * @return the size of invalid list
    */
   int getInvalidSize();
+
+  /**
+   * Trigger transaction pruning now.
+   */
+  void pruneNow();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
index f95e5b3..b3a9ae7 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
@@ -48,9 +48,9 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 
 /**
- *
+ * Transaction Service that includes Transaction Manager, Thrift Server, and Pruning Service.
  */
-public final class TransactionService extends InMemoryTransactionService {
+public class TransactionService extends InMemoryTransactionService {
   private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class);
   private LeaderElection leaderElection;
   private final Configuration conf;
@@ -97,7 +97,7 @@ public final class TransactionService extends InMemoryTransactionService {
           }
         }, MoreExecutors.sameThreadExecutor());
 
-        pruningService = new TransactionPruningService(conf, txManager);
+        pruningService = createPruningService(conf, txManager);
 
         server = ThriftRPCServer.builder(TTransactionServer.class)
           .setHost(address)
@@ -105,7 +105,7 @@ public final class TransactionService extends InMemoryTransactionService {
           .setWorkerThreads(threads)
           .setMaxReadBufferBytes(maxReadBufferBytes)
           .setIOThreads(ioThreads)
-          .build(new TransactionServiceThriftHandler(txManager));
+          .build(new TransactionServiceThriftHandler(txManager, pruningService));
         try {
           server.startAndWait();
           pruningService.startAndWait();
@@ -142,6 +142,14 @@ public final class TransactionService extends InMemoryTransactionService {
     notifyStarted();
   }
 
+  /**
+   * Called at startup to create the pruning service.
+   */
+  @VisibleForTesting
+  protected TransactionPruningService createPruningService(Configuration conf, TransactionManager txManager) {
+    return new TransactionPruningService(conf, txManager);
+  }
+
   @VisibleForTesting
   State thriftRPCServerState() {
     return server.state();
@@ -179,4 +187,16 @@ public final class TransactionService extends InMemoryTransactionService {
   public TransactionManager getTransactionManager() {
     return txManager;
   }
+
+  /**
+   * This allows systems that embed the transaction service access to the pruning service,
+   * so that they can trigger prunning (rather than waiting for its scheduled run time).
+   *
+   * @return null if pruning is not enabled
+   */
+  @SuppressWarnings({"WeakerAccess", "unused"})
+  @Nullable
+  public TransactionPruningService getTransactionPruningService() {
+    return pruningService;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
index 5f7792a..cdcca7f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
@@ -488,4 +488,22 @@ public class TransactionServiceClient implements TransactionSystemClient {
       throw Throwables.propagate(e);
     }
   }
+
+  @Override
+  public void pruneNow() {
+    try {
+      this.execute(
+        new Operation<Void>("pruneNow") {
+          @Override
+          public Void execute(TransactionServiceThriftClient client)
+            throws TException {
+            client.pruneNow();
+            return null;
+          }
+        });
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
index 8ba81e3..ccd266a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
@@ -302,6 +302,15 @@ public class TransactionServiceThriftClient {
     }
   }
 
+  public void pruneNow() throws TException {
+    try {
+      client.pruneNow();
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
   public boolean isValid() {
     return isValid.get();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
index 954ee1d..174b463 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
@@ -31,6 +31,7 @@ import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotExce
 import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException;
 import org.apache.tephra.distributed.thrift.TTransactionServer;
 import org.apache.tephra.rpc.RPCServiceHandler;
+import org.apache.tephra.txprune.TransactionPruningService;
 import org.apache.thrift.TException;
 
 import java.io.ByteArrayOutputStream;
@@ -57,9 +58,11 @@ import java.util.Set;
 public class TransactionServiceThriftHandler implements TTransactionServer.Iface, RPCServiceHandler {
 
   private final TransactionManager txManager;
+  private final TransactionPruningService pruningService;
 
-  public TransactionServiceThriftHandler(TransactionManager txManager) {
+  public TransactionServiceThriftHandler(TransactionManager txManager, TransactionPruningService pruningService) {
     this.txManager = txManager;
+    this.pruningService = pruningService;
   }
 
   @Override
@@ -206,7 +209,12 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
     }
   }
 
-  /* RPCServiceHandler implementation */
+  @Override
+  public void pruneNow() throws TException {
+    pruningService.pruneNow();
+  }
+
+/* RPCServiceHandler implementation */
 
   @Override
   public void init() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
index 634350d..6c99bb4 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
@@ -88,6 +88,8 @@ public class TTransactionServer {
 
     public TTransaction checkpoint(TTransaction tx) throws TTransactionNotInProgressException, org.apache.thrift.TException;
 
+    public void pruneNow() throws org.apache.thrift.TException;
+
   }
 
   public interface AsyncIface {
@@ -128,6 +130,8 @@ public class TTransactionServer {
 
     public void checkpoint(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.checkpoint_call> resultHandler) throws org.apache.thrift.TException;
 
+    public void pruneNow(org.apache.thrift.async.AsyncMethodCallback<AsyncClient.pruneNow_call> resultHandler) throws org.apache.thrift.TException;
+
   }
 
   public static class Client extends org.apache.thrift.TServiceClient implements Iface {
@@ -581,6 +585,25 @@ public class TTransactionServer {
       throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "checkpoint failed: unknown result");
     }
 
+    public void pruneNow() throws org.apache.thrift.TException
+    {
+      send_pruneNow();
+      recv_pruneNow();
+    }
+
+    public void send_pruneNow() throws org.apache.thrift.TException
+    {
+      pruneNow_args args = new pruneNow_args();
+      sendBase("pruneNow", args);
+    }
+
+    public void recv_pruneNow() throws org.apache.thrift.TException
+    {
+      pruneNow_result result = new pruneNow_result();
+      receiveBase(result, "pruneNow");
+      return;
+    }
+
   }
   public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
     public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -1163,6 +1186,35 @@ public class TTransactionServer {
       }
     }
 
+    public void pruneNow(org.apache.thrift.async.AsyncMethodCallback<pruneNow_call> resultHandler) throws org.apache.thrift.TException {
+      checkReady();
+      pruneNow_call method_call = new pruneNow_call(resultHandler, this, ___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class pruneNow_call extends org.apache.thrift.async.TAsyncMethodCall {
+      public pruneNow_call(org.apache.thrift.async.AsyncMethodCallback<pruneNow_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+        prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("pruneNow", org.apache.thrift.protocol.TMessageType.CALL, 0));
+        pruneNow_args args = new pruneNow_args();
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public void getResult() throws org.apache.thrift.TException {
+        if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+        (new Client(prot)).recv_pruneNow();
+      }
+    }
+
   }
 
   public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@@ -1194,6 +1246,7 @@ public class TTransactionServer {
       processMap.put("truncateInvalidTxBefore", new truncateInvalidTxBefore());
       processMap.put("invalidTxSize", new invalidTxSize());
       processMap.put("checkpoint", new checkpoint());
+      processMap.put("pruneNow", new pruneNow());
       return processMap;
     }
 
@@ -1595,6 +1648,26 @@ public class TTransactionServer {
       }
     }
 
+    public static class pruneNow<I extends Iface> extends org.apache.thrift.ProcessFunction<I, pruneNow_args> {
+      public pruneNow() {
+        super("pruneNow");
+      }
+
+      public pruneNow_args getEmptyArgsInstance() {
+        return new pruneNow_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public pruneNow_result getResult(I iface, pruneNow_args args) throws org.apache.thrift.TException {
+        pruneNow_result result = new pruneNow_result();
+        iface.pruneNow();
+        return result;
+      }
+    }
+
   }
 
   public static class startLong_args implements org.apache.thrift.TBase<startLong_args, startLong_args._Fields>, java.io.Serializable, Cloneable   {
@@ -14782,4 +14855,496 @@ public class TTransactionServer {
 
   }
 
+  public static class pruneNow_args implements org.apache.thrift.TBase<pruneNow_args, pruneNow_args._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("pruneNow_args");
+
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new pruneNow_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new pruneNow_argsTupleSchemeFactory());
+    }
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(pruneNow_args.class, metaDataMap);
+    }
+
+    public pruneNow_args() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public pruneNow_args(pruneNow_args other) {
+    }
+
+    public pruneNow_args deepCopy() {
+      return new pruneNow_args(this);
+    }
+
+    @Override
+    public void clear() {
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof pruneNow_args)
+        return this.equals((pruneNow_args)that);
+      return false;
+    }
+
+    public boolean equals(pruneNow_args that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(pruneNow_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      pruneNow_args typedOther = (pruneNow_args)other;
+
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("pruneNow_args(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class pruneNow_argsStandardSchemeFactory implements SchemeFactory {
+      public pruneNow_argsStandardScheme getScheme() {
+        return new pruneNow_argsStandardScheme();
+      }
+    }
+
+    private static class pruneNow_argsStandardScheme extends StandardScheme<pruneNow_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, pruneNow_args struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, pruneNow_args struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class pruneNow_argsTupleSchemeFactory implements SchemeFactory {
+      public pruneNow_argsTupleScheme getScheme() {
+        return new pruneNow_argsTupleScheme();
+      }
+    }
+
+    private static class pruneNow_argsTupleScheme extends TupleScheme<pruneNow_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, pruneNow_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, pruneNow_args struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+      }
+    }
+
+  }
+
+  public static class pruneNow_result implements org.apache.thrift.TBase<pruneNow_result, pruneNow_result._Fields>, java.io.Serializable, Cloneable   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("pruneNow_result");
+
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new pruneNow_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new pruneNow_resultTupleSchemeFactory());
+    }
+
+
+    /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+      private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+    public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(pruneNow_result.class, metaDataMap);
+    }
+
+    public pruneNow_result() {
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public pruneNow_result(pruneNow_result other) {
+    }
+
+    public pruneNow_result deepCopy() {
+      return new pruneNow_result(this);
+    }
+
+    @Override
+    public void clear() {
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof pruneNow_result)
+        return this.equals((pruneNow_result)that);
+      return false;
+    }
+
+    public boolean equals(pruneNow_result that) {
+      if (that == null)
+        return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return 0;
+    }
+
+    public int compareTo(pruneNow_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+      pruneNow_result typedOther = (pruneNow_result)other;
+
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("pruneNow_result(");
+      boolean first = true;
+
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class pruneNow_resultStandardSchemeFactory implements SchemeFactory {
+      public pruneNow_resultStandardScheme getScheme() {
+        return new pruneNow_resultStandardScheme();
+      }
+    }
+
+    private static class pruneNow_resultStandardScheme extends StandardScheme<pruneNow_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, pruneNow_result struct) throws org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, pruneNow_result struct) throws org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class pruneNow_resultTupleSchemeFactory implements SchemeFactory {
+      public pruneNow_resultTupleScheme getScheme() {
+        return new pruneNow_resultTupleScheme();
+      }
+    }
+
+    private static class pruneNow_resultTupleScheme extends TupleScheme<pruneNow_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, pruneNow_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, pruneNow_result struct) throws org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+      }
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
index c8bf22a..0a8ed96 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
@@ -140,4 +140,9 @@ public class DetachedTxSystemClient implements TransactionSystemClient {
   public int getInvalidSize() {
     return 0;
   }
+
+  @Override
+  public void pruneNow() {
+    // do nothing
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
index da38dd2..9719bcc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -133,4 +133,10 @@ public class InMemoryTxSystemClient implements TransactionSystemClient {
   public int getInvalidSize() {
     return txManager.getInvalidSize();
   }
+
+
+  @Override
+  public void pruneNow() {
+    // no-op: no pruning in-memory
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
index 2f60225..b54e57f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -109,4 +109,9 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
   public int getInvalidSize() {
     return 0;
   }
+
+  @Override
+  public void pruneNow() {
+    // do nothing
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
index 8d7fe2f..ae22372 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
@@ -50,6 +50,7 @@ public class TransactionPruningService extends AbstractIdleService {
   private final TransactionManager txManager;
   private final long scheduleInterval;
   private final boolean pruneEnabled;
+  private TransactionPruningRunnable pruneRunnable;
   private ScheduledExecutorService scheduledExecutorService;
 
   public TransactionPruningService(Configuration conf, TransactionManager txManager) {
@@ -81,9 +82,8 @@ public class TransactionPruningService extends AbstractIdleService {
     long txPruneBufferMillis =
       TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD,
                                             TxConstants.TransactionPruning.DEFAULT_PRUNE_GRACE_PERIOD));
-    scheduledExecutorService.scheduleAtFixedRate(
-      getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis),
-      scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
+    pruneRunnable = getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
+    scheduledExecutorService.scheduleAtFixedRate(pruneRunnable, scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
     LOG.info("Scheduled {} plugins with interval {} seconds", plugins.size(), scheduleInterval);
   }
 
@@ -104,6 +104,19 @@ public class TransactionPruningService extends AbstractIdleService {
     LOG.info("Stopped {}", this.getClass().getSimpleName());
   }
 
+  /**
+   * Trigger a run of the transaction pruning. It will run as soon as no pruning is running. That is,
+   * if pruning is running at this moment, then another will start after it is done.
+   */
+  public void pruneNow() {
+    if (pruneEnabled) {
+      scheduledExecutorService.execute(pruneRunnable);
+      LOG.info("Triggered invalid transaction pruning due to request received.");
+    } else {
+      LOG.info("Request to trigger transaction pruning received but pruning is not enabled.");
+    }
+  }
+
   @VisibleForTesting
   TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager,
                                                 Map<String, TransactionPruningPlugin> plugins,

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index f4460e5..0e05244 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -86,4 +86,5 @@ service TTransactionServer {
   TBoolean truncateInvalidTxBefore(1: i64 time) throws (1: TInvalidTruncateTimeException e),
   i32 invalidTxSize(),
   TTransaction checkpoint(1: TTransaction tx) throws (1: TTransactionNotInProgressException e),
+  void pruneNow(),
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
index 7fca246..f4437c2 100644
--- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -20,7 +20,9 @@ package org.apache.tephra;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.Provider;
 import com.google.inject.Scopes;
 import com.google.inject.util.Modules;
 import org.apache.hadoop.conf.Configuration;
@@ -34,14 +36,18 @@ import org.apache.tephra.runtime.DiscoveryModules;
 import org.apache.tephra.runtime.TransactionClientModule;
 import org.apache.tephra.runtime.TransactionModules;
 import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.txprune.TransactionPruningService;
 import org.apache.tephra.util.Tests;
+import org.apache.twill.discovery.DiscoveryService;
 import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClient;
 import org.apache.twill.zookeeper.ZKClientService;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,6 +64,8 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
   private static TransactionStateStorage storage;
   private static TransactionSystemClient txClient;
 
+  private static AtomicInteger pruneRuns = new AtomicInteger();
+
   @ClassRule
   public static TemporaryFolder tmpFolder = new TemporaryFolder();
   
@@ -83,6 +91,7 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
           @Override
           protected void configure() {
             bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+            bind(TransactionService.class).to(TestTransactionService.class).in(Scopes.SINGLETON);
           }
         }),
       new TransactionClientModule()
@@ -150,6 +159,13 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
     Assert.assertEquals(0, CountingRetryStrategyProvider.retries.get());
   }
 
+  @Test
+  public void testPruneNow() {
+    int runs = pruneRuns.get();
+    txClient.pruneNow();
+    Assert.assertEquals(pruneRuns.get(), runs + 1);
+  }
+
   // implements a retry strategy that lets us verify how many times it retried
   public static class CountingRetryStrategyProvider extends RetryNTimes.Provider {
 
@@ -171,4 +187,23 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
       };
     }
   }
+
+  public static class TestTransactionService extends TransactionService {
+    @Inject
+    public TestTransactionService(Configuration conf, ZKClient zkClient,
+                                  DiscoveryService discoveryService,
+                                  Provider<TransactionManager> txManagerProvider) {
+      super(conf, zkClient, discoveryService, txManagerProvider);
+    }
+
+    @Override
+    protected TransactionPruningService createPruningService(Configuration conf, TransactionManager txManager) {
+      return new TransactionPruningService(conf, txManager) {
+        @Override
+        public void pruneNow() {
+          pruneRuns.incrementAndGet();
+        }
+      };
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
index 2a0a17e..c8734d7 100644
--- a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.tephra.txprune;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -67,7 +68,6 @@ public class TransactionPruningServiceTest {
              "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2");
     // Setup schedule to run every second
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
-    conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
     conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
     conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0);
 
@@ -96,7 +96,11 @@ public class TransactionPruningServiceTest {
     pruningService.startAndWait();
     // This will cause the pruning run to happen three times,
     // but we are interested in only first two runs for the assertions later
-    TimeUnit.SECONDS.sleep(3);
+    int pruneRuns = TestTransactionPruningRunnable.getRuns();
+    pruningService.pruneNow();
+    pruningService.pruneNow();
+    pruningService.pruneNow();
+    TestTransactionPruningRunnable.waitForRuns(pruneRuns + 3, 5, TimeUnit.MILLISECONDS);
     pruningService.stopAndWait();
 
     // Assert inactive transaction bound that the plugins receive.
@@ -131,7 +135,6 @@ public class TransactionPruningServiceTest {
              "org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2");
     // Setup schedule to run every second
     conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
-    conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
     conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
     conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0);
 
@@ -160,7 +163,11 @@ public class TransactionPruningServiceTest {
     pruningService.startAndWait();
     // This will cause the pruning run to happen three times,
     // but we are interested in only first two runs for the assertions later
-    TimeUnit.SECONDS.sleep(3);
+    int pruneRuns = TestTransactionPruningRunnable.getRuns();
+    pruningService.pruneNow();
+    pruningService.pruneNow();
+    pruningService.pruneNow();
+    TestTransactionPruningRunnable.waitForRuns(pruneRuns + 3, 5, TimeUnit.MILLISECONDS);
     pruningService.stopAndWait();
 
     // Assert inactive transaction bound
@@ -233,6 +240,7 @@ public class TransactionPruningServiceTest {
    * Extends {@link TransactionPruningRunnable} to use mock time to help in testing.
    */
   private static class TestTransactionPruningRunnable extends TransactionPruningRunnable {
+    private static int pruneRuns = 0;
     private static Iterator<Long> currentTime;
     TestTransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins,
                                    long txMaxLifetimeMillis, long txPruneBufferMillis) {
@@ -247,6 +255,25 @@ public class TransactionPruningServiceTest {
     static void setCurrentTime(Iterator<Long> currentTime) {
       TestTransactionPruningRunnable.currentTime = currentTime;
     }
+
+    @Override
+    public void run() {
+      super.run();
+      pruneRuns++;
+    }
+
+    private static int getRuns() {
+      return pruneRuns;
+    }
+
+    public static void waitForRuns(int runs, int timeout, TimeUnit unit) throws Exception {
+      long timeoutMillis = unit.toMillis(timeout);
+      Stopwatch stopWatch = new Stopwatch();
+      stopWatch.start();
+      while (pruneRuns < runs && stopWatch.elapsedMillis() < timeoutMillis) {
+        TimeUnit.MILLISECONDS.sleep(100);
+      }
+    }
   }
 
   /**