You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by an...@apache.org on 2017/09/09 05:54:04 UTC
incubator-tephra git commit: (TEPHRA-250) Allow to trigger
transaction pruning
Repository: incubator-tephra
Updated Branches:
refs/heads/master 39f4fde63 -> 8532076f8
(TEPHRA-250) Allow to trigger transaction pruning
This closes #52
Signed-off-by: anew <an...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/8532076f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/8532076f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/8532076f
Branch: refs/heads/master
Commit: 8532076f802508a4a761408a7501c82463dd3ad5
Parents: 39f4fde
Author: anew <an...@apache.org>
Authored: Fri Sep 8 16:36:29 2017 -0700
Committer: anew <an...@apache.org>
Committed: Fri Sep 8 22:52:36 2017 -0700
----------------------------------------------------------------------
.../apache/tephra/TransactionSystemClient.java | 5 +
.../tephra/distributed/TransactionService.java | 28 +-
.../distributed/TransactionServiceClient.java | 18 +
.../TransactionServiceThriftClient.java | 9 +
.../TransactionServiceThriftHandler.java | 12 +-
.../distributed/thrift/TTransactionServer.java | 565 +++++++++++++++++++
.../tephra/inmemory/DetachedTxSystemClient.java | 5 +
.../tephra/inmemory/InMemoryTxSystemClient.java | 6 +
.../tephra/inmemory/MinimalTxSystemClient.java | 5 +
.../txprune/TransactionPruningService.java | 19 +-
tephra-core/src/main/thrift/transaction.thrift | 1 +
.../tephra/ThriftTransactionSystemTest.java | 35 ++
.../txprune/TransactionPruningServiceTest.java | 35 +-
13 files changed, 730 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
index fe4b63e..9702c61 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java
@@ -140,4 +140,9 @@ public interface TransactionSystemClient {
* @return the size of invalid list
*/
int getInvalidSize();
+
+ /**
+ * Trigger transaction pruning now.
+ */
+ void pruneNow();
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
index f95e5b3..b3a9ae7 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java
@@ -48,9 +48,9 @@ import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
/**
- *
+ * Transaction Service that includes Transaction Manager, Thrift Server, and Pruning Service.
*/
-public final class TransactionService extends InMemoryTransactionService {
+public class TransactionService extends InMemoryTransactionService {
private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class);
private LeaderElection leaderElection;
private final Configuration conf;
@@ -97,7 +97,7 @@ public final class TransactionService extends InMemoryTransactionService {
}
}, MoreExecutors.sameThreadExecutor());
- pruningService = new TransactionPruningService(conf, txManager);
+ pruningService = createPruningService(conf, txManager);
server = ThriftRPCServer.builder(TTransactionServer.class)
.setHost(address)
@@ -105,7 +105,7 @@ public final class TransactionService extends InMemoryTransactionService {
.setWorkerThreads(threads)
.setMaxReadBufferBytes(maxReadBufferBytes)
.setIOThreads(ioThreads)
- .build(new TransactionServiceThriftHandler(txManager));
+ .build(new TransactionServiceThriftHandler(txManager, pruningService));
try {
server.startAndWait();
pruningService.startAndWait();
@@ -142,6 +142,14 @@ public final class TransactionService extends InMemoryTransactionService {
notifyStarted();
}
+ /**
+ * Called at startup to create the pruning service.
+ */
+ @VisibleForTesting
+ protected TransactionPruningService createPruningService(Configuration conf, TransactionManager txManager) {
+ return new TransactionPruningService(conf, txManager);
+ }
+
@VisibleForTesting
State thriftRPCServerState() {
return server.state();
@@ -179,4 +187,16 @@ public final class TransactionService extends InMemoryTransactionService {
public TransactionManager getTransactionManager() {
return txManager;
}
+
+ /**
+ * This allows systems that embed the transaction service access to the pruning service,
+ * so that they can trigger prunning (rather than waiting for its scheduled run time).
+ *
+ * @return null if pruning is not enabled
+ */
+ @SuppressWarnings({"WeakerAccess", "unused"})
+ @Nullable
+ public TransactionPruningService getTransactionPruningService() {
+ return pruningService;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
index 5f7792a..cdcca7f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
@@ -488,4 +488,22 @@ public class TransactionServiceClient implements TransactionSystemClient {
throw Throwables.propagate(e);
}
}
+
+ @Override
+ public void pruneNow() {
+ try {
+ this.execute(
+ new Operation<Void>("pruneNow") {
+ @Override
+ public Void execute(TransactionServiceThriftClient client)
+ throws TException {
+ client.pruneNow();
+ return null;
+ }
+ });
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
index 8ba81e3..ccd266a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
@@ -302,6 +302,15 @@ public class TransactionServiceThriftClient {
}
}
+ public void pruneNow() throws TException {
+ try {
+ client.pruneNow();
+ } catch (TException e) {
+ isValid.set(false);
+ throw e;
+ }
+ }
+
public boolean isValid() {
return isValid.get();
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
index 954ee1d..174b463 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
@@ -31,6 +31,7 @@ import org.apache.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotExce
import org.apache.tephra.distributed.thrift.TTransactionNotInProgressException;
import org.apache.tephra.distributed.thrift.TTransactionServer;
import org.apache.tephra.rpc.RPCServiceHandler;
+import org.apache.tephra.txprune.TransactionPruningService;
import org.apache.thrift.TException;
import java.io.ByteArrayOutputStream;
@@ -57,9 +58,11 @@ import java.util.Set;
public class TransactionServiceThriftHandler implements TTransactionServer.Iface, RPCServiceHandler {
private final TransactionManager txManager;
+ private final TransactionPruningService pruningService;
- public TransactionServiceThriftHandler(TransactionManager txManager) {
+ public TransactionServiceThriftHandler(TransactionManager txManager, TransactionPruningService pruningService) {
this.txManager = txManager;
+ this.pruningService = pruningService;
}
@Override
@@ -206,7 +209,12 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
}
}
- /* RPCServiceHandler implementation */
+ @Override
+ public void pruneNow() throws TException {
+ pruningService.pruneNow();
+ }
+
+/* RPCServiceHandler implementation */
@Override
public void init() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
index 634350d..6c99bb4 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
@@ -88,6 +88,8 @@ public class TTransactionServer {
public TTransaction checkpoint(TTransaction tx) throws TTransactionNotInProgressException, org.apache.thrift.TException;
+ public void pruneNow() throws org.apache.thrift.TException;
+
}
public interface AsyncIface {
@@ -128,6 +130,8 @@ public class TTransactionServer {
public void checkpoint(TTransaction tx, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.checkpoint_call> resultHandler) throws org.apache.thrift.TException;
+ public void pruneNow(org.apache.thrift.async.AsyncMethodCallback<AsyncClient.pruneNow_call> resultHandler) throws org.apache.thrift.TException;
+
}
public static class Client extends org.apache.thrift.TServiceClient implements Iface {
@@ -581,6 +585,25 @@ public class TTransactionServer {
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "checkpoint failed: unknown result");
}
+ public void pruneNow() throws org.apache.thrift.TException
+ {
+ send_pruneNow();
+ recv_pruneNow();
+ }
+
+ public void send_pruneNow() throws org.apache.thrift.TException
+ {
+ pruneNow_args args = new pruneNow_args();
+ sendBase("pruneNow", args);
+ }
+
+ public void recv_pruneNow() throws org.apache.thrift.TException
+ {
+ pruneNow_result result = new pruneNow_result();
+ receiveBase(result, "pruneNow");
+ return;
+ }
+
}
public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
@@ -1163,6 +1186,35 @@ public class TTransactionServer {
}
}
+ public void pruneNow(org.apache.thrift.async.AsyncMethodCallback<pruneNow_call> resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ pruneNow_call method_call = new pruneNow_call(resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class pruneNow_call extends org.apache.thrift.async.TAsyncMethodCall {
+ public pruneNow_call(org.apache.thrift.async.AsyncMethodCallback<pruneNow_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("pruneNow", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ pruneNow_args args = new pruneNow_args();
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ (new Client(prot)).recv_pruneNow();
+ }
+ }
+
}
public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
@@ -1194,6 +1246,7 @@ public class TTransactionServer {
processMap.put("truncateInvalidTxBefore", new truncateInvalidTxBefore());
processMap.put("invalidTxSize", new invalidTxSize());
processMap.put("checkpoint", new checkpoint());
+ processMap.put("pruneNow", new pruneNow());
return processMap;
}
@@ -1595,6 +1648,26 @@ public class TTransactionServer {
}
}
+ public static class pruneNow<I extends Iface> extends org.apache.thrift.ProcessFunction<I, pruneNow_args> {
+ public pruneNow() {
+ super("pruneNow");
+ }
+
+ public pruneNow_args getEmptyArgsInstance() {
+ return new pruneNow_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public pruneNow_result getResult(I iface, pruneNow_args args) throws org.apache.thrift.TException {
+ pruneNow_result result = new pruneNow_result();
+ iface.pruneNow();
+ return result;
+ }
+ }
+
}
public static class startLong_args implements org.apache.thrift.TBase<startLong_args, startLong_args._Fields>, java.io.Serializable, Cloneable {
@@ -14782,4 +14855,496 @@ public class TTransactionServer {
}
+ public static class pruneNow_args implements org.apache.thrift.TBase<pruneNow_args, pruneNow_args._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("pruneNow_args");
+
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new pruneNow_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new pruneNow_argsTupleSchemeFactory());
+ }
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(pruneNow_args.class, metaDataMap);
+ }
+
+ public pruneNow_args() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public pruneNow_args(pruneNow_args other) {
+ }
+
+ public pruneNow_args deepCopy() {
+ return new pruneNow_args(this);
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof pruneNow_args)
+ return this.equals((pruneNow_args)that);
+ return false;
+ }
+
+ public boolean equals(pruneNow_args that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(pruneNow_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ pruneNow_args typedOther = (pruneNow_args)other;
+
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("pruneNow_args(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class pruneNow_argsStandardSchemeFactory implements SchemeFactory {
+ public pruneNow_argsStandardScheme getScheme() {
+ return new pruneNow_argsStandardScheme();
+ }
+ }
+
+ private static class pruneNow_argsStandardScheme extends StandardScheme<pruneNow_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, pruneNow_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, pruneNow_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class pruneNow_argsTupleSchemeFactory implements SchemeFactory {
+ public pruneNow_argsTupleScheme getScheme() {
+ return new pruneNow_argsTupleScheme();
+ }
+ }
+
+ private static class pruneNow_argsTupleScheme extends TupleScheme<pruneNow_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, pruneNow_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, pruneNow_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ }
+ }
+
+ }
+
+ public static class pruneNow_result implements org.apache.thrift.TBase<pruneNow_result, pruneNow_result._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("pruneNow_result");
+
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new pruneNow_resultStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new pruneNow_resultTupleSchemeFactory());
+ }
+
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+;
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(pruneNow_result.class, metaDataMap);
+ }
+
+ public pruneNow_result() {
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public pruneNow_result(pruneNow_result other) {
+ }
+
+ public pruneNow_result deepCopy() {
+ return new pruneNow_result(this);
+ }
+
+ @Override
+ public void clear() {
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof pruneNow_result)
+ return this.equals((pruneNow_result)that);
+ return false;
+ }
+
+ public boolean equals(pruneNow_result that) {
+ if (that == null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(pruneNow_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ pruneNow_result typedOther = (pruneNow_result)other;
+
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("pruneNow_result(");
+ boolean first = true;
+
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class pruneNow_resultStandardSchemeFactory implements SchemeFactory {
+ public pruneNow_resultStandardScheme getScheme() {
+ return new pruneNow_resultStandardScheme();
+ }
+ }
+
+ private static class pruneNow_resultStandardScheme extends StandardScheme<pruneNow_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, pruneNow_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, pruneNow_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class pruneNow_resultTupleSchemeFactory implements SchemeFactory {
+ public pruneNow_resultTupleScheme getScheme() {
+ return new pruneNow_resultTupleScheme();
+ }
+ }
+
+ private static class pruneNow_resultTupleScheme extends TupleScheme<pruneNow_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, pruneNow_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, pruneNow_result struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ }
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
index c8bf22a..0a8ed96 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/DetachedTxSystemClient.java
@@ -140,4 +140,9 @@ public class DetachedTxSystemClient implements TransactionSystemClient {
public int getInvalidSize() {
return 0;
}
+
+ @Override
+ public void pruneNow() {
+ // do nothing
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
index da38dd2..9719bcc 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/InMemoryTxSystemClient.java
@@ -133,4 +133,10 @@ public class InMemoryTxSystemClient implements TransactionSystemClient {
public int getInvalidSize() {
return txManager.getInvalidSize();
}
+
+
+ @Override
+ public void pruneNow() {
+ // no-op: no pruning in-memory
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
index 2f60225..b54e57f 100644
--- a/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/inmemory/MinimalTxSystemClient.java
@@ -109,4 +109,9 @@ public class MinimalTxSystemClient implements TransactionSystemClient {
public int getInvalidSize() {
return 0;
}
+
+ @Override
+ public void pruneNow() {
+ // do nothing
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
index 8d7fe2f..ae22372 100644
--- a/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
+++ b/tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningService.java
@@ -50,6 +50,7 @@ public class TransactionPruningService extends AbstractIdleService {
private final TransactionManager txManager;
private final long scheduleInterval;
private final boolean pruneEnabled;
+ private TransactionPruningRunnable pruneRunnable;
private ScheduledExecutorService scheduledExecutorService;
public TransactionPruningService(Configuration conf, TransactionManager txManager) {
@@ -81,9 +82,8 @@ public class TransactionPruningService extends AbstractIdleService {
long txPruneBufferMillis =
TimeUnit.SECONDS.toMillis(conf.getLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD,
TxConstants.TransactionPruning.DEFAULT_PRUNE_GRACE_PERIOD));
- scheduledExecutorService.scheduleAtFixedRate(
- getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis),
- scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
+ pruneRunnable = getTxPruneRunnable(txManager, plugins, txMaxLifetimeMillis, txPruneBufferMillis);
+ scheduledExecutorService.scheduleAtFixedRate(pruneRunnable, scheduleInterval, scheduleInterval, TimeUnit.SECONDS);
LOG.info("Scheduled {} plugins with interval {} seconds", plugins.size(), scheduleInterval);
}
@@ -104,6 +104,19 @@ public class TransactionPruningService extends AbstractIdleService {
LOG.info("Stopped {}", this.getClass().getSimpleName());
}
+ /**
+ * Trigger a run of the transaction pruning. It will run as soon as no pruning is running. That is,
+ * if pruning is running at this moment, then another will start after it is done.
+ */
+ public void pruneNow() {
+ if (pruneEnabled) {
+ scheduledExecutorService.execute(pruneRunnable);
+ LOG.info("Triggered invalid transaction pruning due to request received.");
+ } else {
+ LOG.info("Request to trigger transaction pruning received but pruning is not enabled.");
+ }
+ }
+
@VisibleForTesting
TransactionPruningRunnable getTxPruneRunnable(TransactionManager txManager,
Map<String, TransactionPruningPlugin> plugins,
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index f4460e5..0e05244 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -86,4 +86,5 @@ service TTransactionServer {
TBoolean truncateInvalidTxBefore(1: i64 time) throws (1: TInvalidTruncateTimeException e),
i32 invalidTxSize(),
TTransaction checkpoint(1: TTransaction tx) throws (1: TTransactionNotInProgressException e),
+ void pruneNow(),
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
index 7fca246..f4437c2 100644
--- a/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/ThriftTransactionSystemTest.java
@@ -20,7 +20,9 @@ package org.apache.tephra;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
+import com.google.inject.Inject;
import com.google.inject.Injector;
+import com.google.inject.Provider;
import com.google.inject.Scopes;
import com.google.inject.util.Modules;
import org.apache.hadoop.conf.Configuration;
@@ -34,14 +36,18 @@ import org.apache.tephra.runtime.DiscoveryModules;
import org.apache.tephra.runtime.TransactionClientModule;
import org.apache.tephra.runtime.TransactionModules;
import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.txprune.TransactionPruningService;
import org.apache.tephra.util.Tests;
+import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
+import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +64,8 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
private static TransactionStateStorage storage;
private static TransactionSystemClient txClient;
+ private static AtomicInteger pruneRuns = new AtomicInteger();
+
@ClassRule
public static TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -83,6 +91,7 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
@Override
protected void configure() {
bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+ bind(TransactionService.class).to(TestTransactionService.class).in(Scopes.SINGLETON);
}
}),
new TransactionClientModule()
@@ -150,6 +159,13 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
Assert.assertEquals(0, CountingRetryStrategyProvider.retries.get());
}
+ @Test
+ public void testPruneNow() {
+ int runs = pruneRuns.get();
+ txClient.pruneNow();
+ Assert.assertEquals(pruneRuns.get(), runs + 1);
+ }
+
// implements a retry strategy that lets us verify how many times it retried
public static class CountingRetryStrategyProvider extends RetryNTimes.Provider {
@@ -171,4 +187,23 @@ public class ThriftTransactionSystemTest extends TransactionSystemTest {
};
}
}
+
+ public static class TestTransactionService extends TransactionService {
+ @Inject
+ public TestTransactionService(Configuration conf, ZKClient zkClient,
+ DiscoveryService discoveryService,
+ Provider<TransactionManager> txManagerProvider) {
+ super(conf, zkClient, discoveryService, txManagerProvider);
+ }
+
+ @Override
+ protected TransactionPruningService createPruningService(Configuration conf, TransactionManager txManager) {
+ return new TransactionPruningService(conf, txManager) {
+ @Override
+ public void pruneNow() {
+ pruneRuns.incrementAndGet();
+ }
+ };
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/8532076f/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
index 2a0a17e..c8734d7 100644
--- a/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/txprune/TransactionPruningServiceTest.java
@@ -19,6 +19,7 @@
package org.apache.tephra.txprune;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
@@ -67,7 +68,6 @@ public class TransactionPruningServiceTest {
"org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2");
// Setup schedule to run every second
conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
- conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0);
@@ -96,7 +96,11 @@ public class TransactionPruningServiceTest {
pruningService.startAndWait();
// This will cause the pruning run to happen three times,
// but we are interested in only first two runs for the assertions later
- TimeUnit.SECONDS.sleep(3);
+ int pruneRuns = TestTransactionPruningRunnable.getRuns();
+ pruningService.pruneNow();
+ pruningService.pruneNow();
+ pruningService.pruneNow();
+ TestTransactionPruningRunnable.waitForRuns(pruneRuns + 3, 5, TimeUnit.MILLISECONDS);
pruningService.stopAndWait();
// Assert inactive transaction bound that the plugins receive.
@@ -131,7 +135,6 @@ public class TransactionPruningServiceTest {
"org.apache.tephra.txprune.TransactionPruningServiceTest$MockPlugin2");
// Setup schedule to run every second
conf.setBoolean(TxConstants.TransactionPruning.PRUNE_ENABLE, true);
- conf.setInt(TxConstants.TransactionPruning.PRUNE_INTERVAL, 1);
conf.setInt(TxConstants.Manager.CFG_TX_MAX_LIFETIME, 10);
conf.setLong(TxConstants.TransactionPruning.PRUNE_GRACE_PERIOD, 0);
@@ -160,7 +163,11 @@ public class TransactionPruningServiceTest {
pruningService.startAndWait();
// This will cause the pruning run to happen three times,
// but we are interested in only first two runs for the assertions later
- TimeUnit.SECONDS.sleep(3);
+ int pruneRuns = TestTransactionPruningRunnable.getRuns();
+ pruningService.pruneNow();
+ pruningService.pruneNow();
+ pruningService.pruneNow();
+ TestTransactionPruningRunnable.waitForRuns(pruneRuns + 3, 5, TimeUnit.MILLISECONDS);
pruningService.stopAndWait();
// Assert inactive transaction bound
@@ -233,6 +240,7 @@ public class TransactionPruningServiceTest {
* Extends {@link TransactionPruningRunnable} to use mock time to help in testing.
*/
private static class TestTransactionPruningRunnable extends TransactionPruningRunnable {
+ private static int pruneRuns = 0;
private static Iterator<Long> currentTime;
TestTransactionPruningRunnable(TransactionManager txManager, Map<String, TransactionPruningPlugin> plugins,
long txMaxLifetimeMillis, long txPruneBufferMillis) {
@@ -247,6 +255,25 @@ public class TransactionPruningServiceTest {
static void setCurrentTime(Iterator<Long> currentTime) {
TestTransactionPruningRunnable.currentTime = currentTime;
}
+
+ @Override
+ public void run() {
+ super.run();
+ pruneRuns++;
+ }
+
+ private static int getRuns() {
+ return pruneRuns;
+ }
+
+ public static void waitForRuns(int runs, int timeout, TimeUnit unit) throws Exception {
+ long timeoutMillis = unit.toMillis(timeout);
+ Stopwatch stopWatch = new Stopwatch();
+ stopWatch.start();
+ while (pruneRuns < runs && stopWatch.elapsedMillis() < timeoutMillis) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ }
}
/**