You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:20:51 UTC
[05/34] accumulo git commit: ACCUMULO-3423 respond to elserj's review
ACCUMULO-3423 respond to elserj's review
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c8f3b7d3
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c8f3b7d3
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c8f3b7d3
Branch: refs/heads/master
Commit: c8f3b7d3b8591ea2330437549b9578ae9feebaaf
Parents: b2539fb
Author: Eric C. Newton <er...@gmail.com>
Authored: Thu Feb 26 15:17:04 2015 -0500
Committer: Eric C. Newton <er...@gmail.com>
Committed: Thu Feb 26 15:17:04 2015 -0500
----------------------------------------------------------------------
.../accumulo/core/replication/StatusUtil.java | 9 +
.../thrift/TabletClientService.java | 749 ++++++++++++++++++-
core/src/main/thrift/tabletserver.thrift | 1 +
.../server/master/state/MetaDataStateStore.java | 3 +
.../master/state/MetaDataTableScanner.java | 7 +-
.../server/master/state/TabletStateStore.java | 3 +
.../master/state/ZooTabletStateStore.java | 1 +
.../accumulo/server/util/MetadataTableUtil.java | 7 +-
.../gc/GarbageCollectWriteAheadLogs.java | 4 +-
.../accumulo/master/replication/WorkMaker.java | 2 +-
.../apache/accumulo/tserver/TabletServer.java | 5 +
.../tserver/log/TabletServerLogger.java | 42 +-
.../apache/accumulo/tserver/tablet/Tablet.java | 1 +
.../test/performance/thrift/NullTserver.java | 3 +
14 files changed, 793 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
index a7cd3f5..d8ec403 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
@@ -32,6 +32,7 @@ public class StatusUtil {
private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE;
private static final Status.Builder CREATED_STATUS_BUILDER;
+ private static final Status.Builder INF_END_REPLICATION_STATUS_BUILDER;
static {
CREATED_STATUS_BUILDER = Status.newBuilder();
@@ -45,6 +46,7 @@ public class StatusUtil {
builder.setEnd(0);
builder.setInfiniteEnd(true);
builder.setClosed(false);
+ INF_END_REPLICATION_STATUS_BUILDER = builder;
INF_END_REPLICATION_STATUS = builder.build();
INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS);
@@ -153,6 +155,13 @@ public class StatusUtil {
/**
* @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
*/
+ public static Status openWithUnknownLength(long timeCreated) {
+ return INF_END_REPLICATION_STATUS_BUILDER.setCreatedTime(timeCreated).build();
+ }
+
+ /**
+ * @return A {@link Status} for an open file of unspecified length, all of which needs replicating.
+ */
public static Status openWithUnknownLength() {
return INF_END_REPLICATION_STATUS;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
index d6d4afd..02bd4e1 100644
--- a/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
+++ b/core/src/main/java/org/apache/accumulo/core/tabletserver/thrift/TabletClientService.java
@@ -110,6 +110,8 @@ import org.slf4j.LoggerFactory;
public List<ActiveCompaction> getActiveCompactions(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException, org.apache.thrift.TException;
+ public void removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames) throws org.apache.thrift.TException;
+
public List<String> getActiveLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException;
}
@@ -174,6 +176,8 @@ import org.slf4j.LoggerFactory;
public void getActiveCompactions(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+
public void getActiveLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
}
@@ -896,6 +900,20 @@ import org.slf4j.LoggerFactory;
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getActiveCompactions failed: unknown result");
}
+ public void removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames) throws org.apache.thrift.TException
+ {
+ send_removeLogs(tinfo, credentials, filenames);
+ }
+
+ public void send_removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames) throws org.apache.thrift.TException
+ {
+ removeLogs_args args = new removeLogs_args();
+ args.setTinfo(tinfo);
+ args.setCredentials(credentials);
+ args.setFilenames(filenames);
+ sendBase("removeLogs", args);
+ }
+
public List<String> getActiveLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException
{
send_getActiveLogs(tinfo, credentials);
@@ -2099,6 +2117,43 @@ import org.slf4j.LoggerFactory;
}
}
+ public void removeLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ removeLogs_call method_call = new removeLogs_call(tinfo, credentials, filenames, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class removeLogs_call extends org.apache.thrift.async.TAsyncMethodCall {
+ private org.apache.accumulo.core.trace.thrift.TInfo tinfo;
+ private org.apache.accumulo.core.security.thrift.TCredentials credentials;
+ private List<String> filenames;
+ public removeLogs_call(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, List<String> filenames, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, true);
+ this.tinfo = tinfo;
+ this.credentials = credentials;
+ this.filenames = filenames;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("removeLogs", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ removeLogs_args args = new removeLogs_args();
+ args.setTinfo(tinfo);
+ args.setCredentials(credentials);
+ args.setFilenames(filenames);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public void getResult() throws org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ }
+ }
+
public void getActiveLogs(org.apache.accumulo.core.trace.thrift.TInfo tinfo, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
getActiveLogs_call method_call = new getActiveLogs_call(tinfo, credentials, resultHandler, this, ___protocolFactory, ___transport);
@@ -2176,6 +2231,7 @@ import org.slf4j.LoggerFactory;
processMap.put("fastHalt", new fastHalt());
processMap.put("getActiveScans", new getActiveScans());
processMap.put("getActiveCompactions", new getActiveCompactions());
+ processMap.put("removeLogs", new removeLogs());
processMap.put("getActiveLogs", new getActiveLogs());
return processMap;
}
@@ -2837,6 +2893,25 @@ import org.slf4j.LoggerFactory;
}
}
+ public static class removeLogs<I extends Iface> extends org.apache.thrift.ProcessFunction<I, removeLogs_args> {
+ public removeLogs() {
+ super("removeLogs");
+ }
+
+ public removeLogs_args getEmptyArgsInstance() {
+ return new removeLogs_args();
+ }
+
+ protected boolean isOneway() {
+ return true;
+ }
+
+ public org.apache.thrift.TBase getResult(I iface, removeLogs_args args) throws org.apache.thrift.TException {
+ iface.removeLogs(args.tinfo, args.credentials, args.filenames);
+ return null;
+ }
+ }
+
public static class getActiveLogs<I extends Iface> extends org.apache.thrift.ProcessFunction<I, getActiveLogs_args> {
public getActiveLogs() {
super("getActiveLogs");
@@ -2899,6 +2974,7 @@ import org.slf4j.LoggerFactory;
processMap.put("fastHalt", new fastHalt());
processMap.put("getActiveScans", new getActiveScans());
processMap.put("getActiveCompactions", new getActiveCompactions());
+ processMap.put("removeLogs", new removeLogs());
processMap.put("getActiveLogs", new getActiveLogs());
return processMap;
}
@@ -4291,6 +4367,34 @@ import org.slf4j.LoggerFactory;
}
}
+ public static class removeLogs<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, removeLogs_args, Void> {
+ public removeLogs() {
+ super("removeLogs");
+ }
+
+ public removeLogs_args getEmptyArgsInstance() {
+ return new removeLogs_args();
+ }
+
+ public AsyncMethodCallback<Void> getResultHandler(final AsyncFrameBuffer fb, final int seqid) {
+ final org.apache.thrift.AsyncProcessFunction fcall = this;
+ return new AsyncMethodCallback<Void>() {
+ public void onComplete(Void o) {
+ }
+ public void onError(Exception e) {
+ }
+ };
+ }
+
+ protected boolean isOneway() {
+ return true;
+ }
+
+ public void start(I iface, removeLogs_args args, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws TException {
+ iface.removeLogs(args.tinfo, args.credentials, args.filenames,resultHandler);
+ }
+ }
+
public static class getActiveLogs<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, getActiveLogs_args, List<String>> {
public getActiveLogs() {
super("getActiveLogs");
@@ -33069,6 +33173,619 @@ import org.slf4j.LoggerFactory;
}
+ public static class removeLogs_args implements org.apache.thrift.TBase<removeLogs_args, removeLogs_args._Fields>, java.io.Serializable, Cloneable, Comparable<removeLogs_args> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("removeLogs_args");
+
+ private static final org.apache.thrift.protocol.TField TINFO_FIELD_DESC = new org.apache.thrift.protocol.TField("tinfo", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+ private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+ private static final org.apache.thrift.protocol.TField FILENAMES_FIELD_DESC = new org.apache.thrift.protocol.TField("filenames", org.apache.thrift.protocol.TType.LIST, (short)3);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new removeLogs_argsStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new removeLogs_argsTupleSchemeFactory());
+ }
+
+ public org.apache.accumulo.core.trace.thrift.TInfo tinfo; // required
+ public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required
+ public List<String> filenames; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ TINFO((short)1, "tinfo"),
+ CREDENTIALS((short)2, "credentials"),
+ FILENAMES((short)3, "filenames");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // TINFO
+ return TINFO;
+ case 2: // CREDENTIALS
+ return CREDENTIALS;
+ case 3: // FILENAMES
+ return FILENAMES;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.TINFO, new org.apache.thrift.meta_data.FieldMetaData("tinfo", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.trace.thrift.TInfo.class)));
+ tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class)));
+ tmpMap.put(_Fields.FILENAMES, new org.apache.thrift.meta_data.FieldMetaData("filenames", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(removeLogs_args.class, metaDataMap);
+ }
+
+ public removeLogs_args() {
+ }
+
+ public removeLogs_args(
+ org.apache.accumulo.core.trace.thrift.TInfo tinfo,
+ org.apache.accumulo.core.security.thrift.TCredentials credentials,
+ List<String> filenames)
+ {
+ this();
+ this.tinfo = tinfo;
+ this.credentials = credentials;
+ this.filenames = filenames;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public removeLogs_args(removeLogs_args other) {
+ if (other.isSetTinfo()) {
+ this.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo(other.tinfo);
+ }
+ if (other.isSetCredentials()) {
+ this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials);
+ }
+ if (other.isSetFilenames()) {
+ List<String> __this__filenames = new ArrayList<String>(other.filenames);
+ this.filenames = __this__filenames;
+ }
+ }
+
+ public removeLogs_args deepCopy() {
+ return new removeLogs_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.tinfo = null;
+ this.credentials = null;
+ this.filenames = null;
+ }
+
+ public org.apache.accumulo.core.trace.thrift.TInfo getTinfo() {
+ return this.tinfo;
+ }
+
+ public removeLogs_args setTinfo(org.apache.accumulo.core.trace.thrift.TInfo tinfo) {
+ this.tinfo = tinfo;
+ return this;
+ }
+
+ public void unsetTinfo() {
+ this.tinfo = null;
+ }
+
+ /** Returns true if field tinfo is set (has been assigned a value) and false otherwise */
+ public boolean isSetTinfo() {
+ return this.tinfo != null;
+ }
+
+ public void setTinfoIsSet(boolean value) {
+ if (!value) {
+ this.tinfo = null;
+ }
+ }
+
+ public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() {
+ return this.credentials;
+ }
+
+ public removeLogs_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) {
+ this.credentials = credentials;
+ return this;
+ }
+
+ public void unsetCredentials() {
+ this.credentials = null;
+ }
+
+ /** Returns true if field credentials is set (has been assigned a value) and false otherwise */
+ public boolean isSetCredentials() {
+ return this.credentials != null;
+ }
+
+ public void setCredentialsIsSet(boolean value) {
+ if (!value) {
+ this.credentials = null;
+ }
+ }
+
+ public int getFilenamesSize() {
+ return (this.filenames == null) ? 0 : this.filenames.size();
+ }
+
+ public java.util.Iterator<String> getFilenamesIterator() {
+ return (this.filenames == null) ? null : this.filenames.iterator();
+ }
+
+ public void addToFilenames(String elem) {
+ if (this.filenames == null) {
+ this.filenames = new ArrayList<String>();
+ }
+ this.filenames.add(elem);
+ }
+
+ public List<String> getFilenames() {
+ return this.filenames;
+ }
+
+ public removeLogs_args setFilenames(List<String> filenames) {
+ this.filenames = filenames;
+ return this;
+ }
+
+ public void unsetFilenames() {
+ this.filenames = null;
+ }
+
+ /** Returns true if field filenames is set (has been assigned a value) and false otherwise */
+ public boolean isSetFilenames() {
+ return this.filenames != null;
+ }
+
+ public void setFilenamesIsSet(boolean value) {
+ if (!value) {
+ this.filenames = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case TINFO:
+ if (value == null) {
+ unsetTinfo();
+ } else {
+ setTinfo((org.apache.accumulo.core.trace.thrift.TInfo)value);
+ }
+ break;
+
+ case CREDENTIALS:
+ if (value == null) {
+ unsetCredentials();
+ } else {
+ setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value);
+ }
+ break;
+
+ case FILENAMES:
+ if (value == null) {
+ unsetFilenames();
+ } else {
+ setFilenames((List<String>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case TINFO:
+ return getTinfo();
+
+ case CREDENTIALS:
+ return getCredentials();
+
+ case FILENAMES:
+ return getFilenames();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case TINFO:
+ return isSetTinfo();
+ case CREDENTIALS:
+ return isSetCredentials();
+ case FILENAMES:
+ return isSetFilenames();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof removeLogs_args)
+ return this.equals((removeLogs_args)that);
+ return false;
+ }
+
+ public boolean equals(removeLogs_args that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_tinfo = true && this.isSetTinfo();
+ boolean that_present_tinfo = true && that.isSetTinfo();
+ if (this_present_tinfo || that_present_tinfo) {
+ if (!(this_present_tinfo && that_present_tinfo))
+ return false;
+ if (!this.tinfo.equals(that.tinfo))
+ return false;
+ }
+
+ boolean this_present_credentials = true && this.isSetCredentials();
+ boolean that_present_credentials = true && that.isSetCredentials();
+ if (this_present_credentials || that_present_credentials) {
+ if (!(this_present_credentials && that_present_credentials))
+ return false;
+ if (!this.credentials.equals(that.credentials))
+ return false;
+ }
+
+ boolean this_present_filenames = true && this.isSetFilenames();
+ boolean that_present_filenames = true && that.isSetFilenames();
+ if (this_present_filenames || that_present_filenames) {
+ if (!(this_present_filenames && that_present_filenames))
+ return false;
+ if (!this.filenames.equals(that.filenames))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public int compareTo(removeLogs_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = Boolean.valueOf(isSetTinfo()).compareTo(other.isSetTinfo());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTinfo()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tinfo, other.tinfo);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(other.isSetCredentials());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCredentials()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, other.credentials);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetFilenames()).compareTo(other.isSetFilenames());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetFilenames()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.filenames, other.filenames);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("removeLogs_args(");
+ boolean first = true;
+
+ sb.append("tinfo:");
+ if (this.tinfo == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.tinfo);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("credentials:");
+ if (this.credentials == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.credentials);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("filenames:");
+ if (this.filenames == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.filenames);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ if (tinfo != null) {
+ tinfo.validate();
+ }
+ if (credentials != null) {
+ credentials.validate();
+ }
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class removeLogs_argsStandardSchemeFactory implements SchemeFactory {
+ public removeLogs_argsStandardScheme getScheme() {
+ return new removeLogs_argsStandardScheme();
+ }
+ }
+
+ private static class removeLogs_argsStandardScheme extends StandardScheme<removeLogs_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, removeLogs_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // TINFO
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo();
+ struct.tinfo.read(iprot);
+ struct.setTinfoIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // CREDENTIALS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+ struct.credentials.read(iprot);
+ struct.setCredentialsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 3: // FILENAMES
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list314 = iprot.readListBegin();
+ struct.filenames = new ArrayList<String>(_list314.size);
+ for (int _i315 = 0; _i315 < _list314.size; ++_i315)
+ {
+ String _elem316;
+ _elem316 = iprot.readString();
+ struct.filenames.add(_elem316);
+ }
+ iprot.readListEnd();
+ }
+ struct.setFilenamesIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, removeLogs_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.tinfo != null) {
+ oprot.writeFieldBegin(TINFO_FIELD_DESC);
+ struct.tinfo.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ if (struct.credentials != null) {
+ oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC);
+ struct.credentials.write(oprot);
+ oprot.writeFieldEnd();
+ }
+ if (struct.filenames != null) {
+ oprot.writeFieldBegin(FILENAMES_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.filenames.size()));
+ for (String _iter317 : struct.filenames)
+ {
+ oprot.writeString(_iter317);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class removeLogs_argsTupleSchemeFactory implements SchemeFactory {
+ public removeLogs_argsTupleScheme getScheme() {
+ return new removeLogs_argsTupleScheme();
+ }
+ }
+
+ private static class removeLogs_argsTupleScheme extends TupleScheme<removeLogs_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, removeLogs_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetTinfo()) {
+ optionals.set(0);
+ }
+ if (struct.isSetCredentials()) {
+ optionals.set(1);
+ }
+ if (struct.isSetFilenames()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
+ if (struct.isSetTinfo()) {
+ struct.tinfo.write(oprot);
+ }
+ if (struct.isSetCredentials()) {
+ struct.credentials.write(oprot);
+ }
+ if (struct.isSetFilenames()) {
+ {
+ oprot.writeI32(struct.filenames.size());
+ for (String _iter318 : struct.filenames)
+ {
+ oprot.writeString(_iter318);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, removeLogs_args struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(3);
+ if (incoming.get(0)) {
+ struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo();
+ struct.tinfo.read(iprot);
+ struct.setTinfoIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+ struct.credentials.read(iprot);
+ struct.setCredentialsIsSet(true);
+ }
+ if (incoming.get(2)) {
+ {
+ org.apache.thrift.protocol.TList _list319 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.filenames = new ArrayList<String>(_list319.size);
+ for (int _i320 = 0; _i320 < _list319.size; ++_i320)
+ {
+ String _elem321;
+ _elem321 = iprot.readString();
+ struct.filenames.add(_elem321);
+ }
+ }
+ struct.setFilenamesIsSet(true);
+ }
+ }
+ }
+
+ }
+
public static class getActiveLogs_args implements org.apache.thrift.TBase<getActiveLogs_args, getActiveLogs_args._Fields>, java.io.Serializable, Cloneable, Comparable<getActiveLogs_args> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getActiveLogs_args");
@@ -33839,13 +34556,13 @@ import org.slf4j.LoggerFactory;
case 0: // SUCCESS
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
- org.apache.thrift.protocol.TList _list314 = iprot.readListBegin();
- struct.success = new ArrayList<String>(_list314.size);
- for (int _i315 = 0; _i315 < _list314.size; ++_i315)
+ org.apache.thrift.protocol.TList _list322 = iprot.readListBegin();
+ struct.success = new ArrayList<String>(_list322.size);
+ for (int _i323 = 0; _i323 < _list322.size; ++_i323)
{
- String _elem316;
- _elem316 = iprot.readString();
- struct.success.add(_elem316);
+ String _elem324;
+ _elem324 = iprot.readString();
+ struct.success.add(_elem324);
}
iprot.readListEnd();
}
@@ -33873,9 +34590,9 @@ import org.slf4j.LoggerFactory;
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.success.size()));
- for (String _iter317 : struct.success)
+ for (String _iter325 : struct.success)
{
- oprot.writeString(_iter317);
+ oprot.writeString(_iter325);
}
oprot.writeListEnd();
}
@@ -33906,9 +34623,9 @@ import org.slf4j.LoggerFactory;
if (struct.isSetSuccess()) {
{
oprot.writeI32(struct.success.size());
- for (String _iter318 : struct.success)
+ for (String _iter326 : struct.success)
{
- oprot.writeString(_iter318);
+ oprot.writeString(_iter326);
}
}
}
@@ -33920,13 +34637,13 @@ import org.slf4j.LoggerFactory;
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
{
- org.apache.thrift.protocol.TList _list319 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
- struct.success = new ArrayList<String>(_list319.size);
- for (int _i320 = 0; _i320 < _list319.size; ++_i320)
+ org.apache.thrift.protocol.TList _list327 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.success = new ArrayList<String>(_list327.size);
+ for (int _i328 = 0; _i328 < _list327.size; ++_i328)
{
- String _elem321;
- _elem321 = iprot.readString();
- struct.success.add(_elem321);
+ String _elem329;
+ _elem329 = iprot.readString();
+ struct.success.add(_elem329);
}
}
struct.setSuccessIsSet(true);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index f25a08f..4a31036 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -205,6 +205,7 @@ service TabletClientService extends client.ClientService {
list<ActiveScan> getActiveScans(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec)
list<ActiveCompaction> getActiveCompactions(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec)
+ oneway void removeLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<string> filenames)
list<string> getActiveLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials)
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 270bb31..1749904 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -33,8 +33,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
public class MetaDataStateStore extends TabletStateStore {
+ private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
private static final int THREADS = 4;
private static final int LATENCY = 1000;
@@ -172,6 +174,7 @@ public class MetaDataStateStore extends TabletStateStore {
writer.addMutation(m);
}
} catch (Exception ex) {
+ log.error("Error marking logs as unused: " + logs);
throw new DistributedStoreException(ex);
} finally {
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
index dac7fe6..130364b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
@@ -43,9 +43,10 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.Lo
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.server.master.state.TabletLocationState.BadLocationStateException;
import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
public class MetaDataTableScanner implements ClosableIterator<TabletLocationState> {
- //private static final Logger log = Logger.getLogger(MetaDataTableScanner.class);
+ private static final Logger log = Logger.getLogger(MetaDataTableScanner.class);
BatchScanner mdScanner = null;
Iterator<Entry<Key,Value>> iter = null;
@@ -172,7 +173,9 @@ public class MetaDataTableScanner implements ClosableIterator<TabletLocationStat
}
}
if (extent == null) {
- throw new BadLocationStateException("No prev-row for key extent " + decodedRow, k.getRow());
+ String msg = "No prev-row for key extent " + decodedRow;
+ log.error(msg);
+ throw new BadLocationStateException(msg, k.getRow());
}
return new TabletLocationState(extent, future, current, last, walogs, chopped);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index de90d98..13db05b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -87,6 +87,9 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
store.setLocations(Collections.singletonList(assignment));
}
+ /**
+ * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets.
+ */
abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance, List<String>> logs) throws DistributedStoreException;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index a044434..66bad4e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -197,6 +197,7 @@ public class ZooTabletStateStore extends TabletStateStore {
@Override
public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<String>> logs) {
+ // the root table is not replicated, so unassigning the root tablet has removed the current log marker
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index a95cffa..ebf4b1b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -89,6 +89,7 @@ import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.tablets.TabletTime;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -1067,7 +1068,7 @@ public class MetadataTableUtil {
@Override
public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
- String[] parts = filename.split("/");
+ String[] parts = StringUtils.split(filename, '/');
String uniqueId = parts[parts.length - 1];
String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
rw.putPersistentData(path, filename.getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);
@@ -1075,7 +1076,7 @@ public class MetadataTableUtil {
});
} else {
Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
- m.put("log", filename, new Value(EMPTY_BYTES));
+ m.put(CurrentLogsSection.COLF, new Text(filename), new Value(EMPTY_BYTES));
String tableName = MetadataTable.NAME;
if (extent.isMeta()) {
tableName = RootTable.NAME;
@@ -1095,7 +1096,7 @@ public class MetadataTableUtil {
@Override
public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
- String[] parts = filename.split("/");
+ String[] parts = StringUtils.split(filename, '/');
String uniqueId = parts[parts.length - 1];
String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
log.debug("Removing entry " + path + " from zookeeper");
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 2561eec..a7703e9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -105,8 +105,8 @@ public class GarbageCollectWriteAheadLogs {
LiveTServerSet liveServers = new LiveTServerSet(context, new Listener() {
@Override
public void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added) {
- log.debug("New tablet server noticed: " + added);
- log.debug("Tablet server removed: " + deleted);
+ log.debug("New tablet servers noticed: " + added);
+ log.debug("Tablet servers removed: " + deleted);
}
});
Set<TServerInstance> currentServers = liveServers.getCurrentServers();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index bc4c64f..4490824 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -107,7 +107,7 @@ public class WorkMaker {
// Don't create the record if we have nothing to do.
// TODO put this into a filter on serverside
if (!shouldCreateWork(status)) {
- log.info("Not creating work: " + status.toString());
+ log.debug("Not creating work: " + status.toString());
continue;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index b12fccc..3b7ff03 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1715,6 +1715,11 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
String log = logger.getLogFile();
return Collections.singletonList(log);
}
+
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException {
+ log.warn("Garbage collector is attempting to remove logs through the tablet server");
+ }
}
private class SplitRunner implements Runnable {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 6455726..46101c1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -37,7 +38,9 @@ import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
+import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.util.SimpleThreadPool;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -73,8 +76,8 @@ public class TabletServerLogger {
// The current logger
private DfsLogger currentLog = null;
- private DfsLogger nextLog = null;
- private Thread nextLogThread = null;
+ private final AtomicReference<DfsLogger> nextLog = new AtomicReference<>(null);
+ private final ThreadPoolExecutor nextLogMaker = new SimpleThreadPool(1, "WALog creator");
// The current generation of logs.
// Because multiple threads can be using a log at one time, a log
@@ -194,16 +197,16 @@ public class TabletServerLogger {
}
try {
- if (nextLog != null) {
- log.info("Using next log " + nextLog.getFileName());
- currentLog = nextLog;
- nextLog = null;
+ DfsLogger next = nextLog.getAndSet(null);
+ if (next != null) {
+ log.info("Using next log " + next.getFileName());
+ currentLog = next;
} else {
DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
alog.open(tserver.getClientAddressString());
currentLog = alog;
}
- if (nextLog == null) {
+ if (nextLog.get() == null) {
createNextLog();
}
logId.incrementAndGet();
@@ -217,31 +220,30 @@ public class TabletServerLogger {
}
}
+ // callers are synchronized already
private void createNextLog() {
- if (nextLogThread == null) {
- nextLogThread = new Thread() {
+ if (nextLogMaker.getActiveCount() == 0) {
+ nextLogMaker.submit(new Runnable() {
@Override
public void run() {
try {
- log.info("Creating next WAL");
- this.setName("Creating next WAL");
+ log.debug("Creating next WAL");
DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
alog.open(tserver.getClientAddressString());
for (Tablet tablet : tserver.getOnlineTablets()) {
- // TODO
tserver.addLoggersToMetadata(alog, tablet.getExtent());
}
- nextLog = alog;
-
- log.info("Created next WAL " + alog.getFileName());
+ log.debug("Created next WAL " + alog.getFileName());
+ alog = nextLog.getAndSet(alog);
+ if (alog != null) {
+ log.debug("closing unused next log: " + alog.getFileName());
+ alog.close();
+ }
} catch (Exception t) {
log.error(t, t);
- } finally {
- nextLogThread = null;
}
}
- };
- nextLogThread.start();
+ });
}
}
@@ -317,7 +319,7 @@ public class TabletServerLogger {
// Need to release
KeyExtent extent = commitSession.getExtent();
if (ReplicationConfigurationUtil.isEnabled(extent, tserver.getTableConfiguration(extent))) {
- Status status = Status.newBuilder().setInfiniteEnd(true).setCreatedTime(System.currentTimeMillis()).build();
+ Status status = StatusUtil.fileCreated(System.currentTimeMillis());
log.debug("Writing " + ProtobufUtil.toString(status) + " to metadata table for " + copy.getFileName());
// Got some new WALs, note this in the metadata table
ReplicationTableUtil.updateFiles(tserver, commitSession.getExtent(), copy.getFileName(), status);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index fa1ae86..0c1edfa 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -2377,6 +2377,7 @@ public class Tablet implements TabletCommitter {
private ConcurrentSkipListSet<DfsLogger> currentLogs = new ConcurrentSkipListSet<DfsLogger>();
+ // currentLogs may be updated while a tablet is otherwise locked
public Set<DfsLogger> getCurrentLogFiles() {
return new HashSet<DfsLogger>(currentLogs);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c8f3b7d3/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index b8a60c1..ac7fd70 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -228,6 +228,9 @@ public class NullTserver {
public List<String> getActiveLogs(TInfo tinfo, TCredentials credentials) throws TException {
return null;
}
+
+ @Override
+ public void removeLogs(TInfo tinfo, TCredentials credentials, List<String> filenames) throws TException { }
}
static class Opts extends Help {