You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/27 06:04:17 UTC
[1/6] ACCUMULO-2587 First addition of authentication between
replication service and client
Repository: accumulo
Updated Branches:
refs/heads/ACCUMULO-378 2425fd24b -> 9d9b5ed24
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java
index e5e26ca..e297445 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationServicer.java
@@ -50,17 +50,17 @@ import org.slf4j.LoggerFactory;
public interface Iface {
- public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, org.apache.thrift.TException;
+ public long replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException;
- public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, org.apache.thrift.TException;
+ public long replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException;
}
public interface AsyncIface {
- public void replicateLog(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateLog_call> resultHandler) throws org.apache.thrift.TException;
+ public void replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateLog_call> resultHandler) throws org.apache.thrift.TException;
- public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException;
+ public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException;
}
@@ -84,17 +84,18 @@ import org.slf4j.LoggerFactory;
super(iprot, oprot);
}
- public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, org.apache.thrift.TException
+ public long replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException
{
- send_replicateLog(remoteTableId, data);
+ send_replicateLog(remoteTableId, data, credentials);
return recv_replicateLog();
}
- public void send_replicateLog(int remoteTableId, WalEdits data) throws org.apache.thrift.TException
+ public void send_replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException
{
replicateLog_args args = new replicateLog_args();
args.setRemoteTableId(remoteTableId);
args.setData(data);
+ args.setCredentials(credentials);
sendBase("replicateLog", args);
}
@@ -111,17 +112,18 @@ import org.slf4j.LoggerFactory;
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "replicateLog failed: unknown result");
}
- public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, org.apache.thrift.TException
+ public long replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws RemoteReplicationException, org.apache.thrift.TException
{
- send_replicateKeyValues(remoteTableId, data);
+ send_replicateKeyValues(remoteTableId, data, credentials);
return recv_replicateKeyValues();
}
- public void send_replicateKeyValues(int remoteTableId, KeyValues data) throws org.apache.thrift.TException
+ public void send_replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException
{
replicateKeyValues_args args = new replicateKeyValues_args();
args.setRemoteTableId(remoteTableId);
args.setData(data);
+ args.setCredentials(credentials);
sendBase("replicateKeyValues", args);
}
@@ -156,9 +158,9 @@ import org.slf4j.LoggerFactory;
super(protocolFactory, clientManager, transport);
}
- public void replicateLog(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler) throws org.apache.thrift.TException {
+ public void replicateLog(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler) throws org.apache.thrift.TException {
checkReady();
- replicateLog_call method_call = new replicateLog_call(remoteTableId, data, resultHandler, this, ___protocolFactory, ___transport);
+ replicateLog_call method_call = new replicateLog_call(remoteTableId, data, credentials, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
@@ -166,10 +168,12 @@ import org.slf4j.LoggerFactory;
public static class replicateLog_call extends org.apache.thrift.async.TAsyncMethodCall {
private int remoteTableId;
private WalEdits data;
- public replicateLog_call(int remoteTableId, WalEdits data, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private org.apache.accumulo.core.security.thrift.TCredentials credentials;
+ public replicateLog_call(int remoteTableId, WalEdits data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateLog_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.remoteTableId = remoteTableId;
this.data = data;
+ this.credentials = credentials;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -177,6 +181,7 @@ import org.slf4j.LoggerFactory;
replicateLog_args args = new replicateLog_args();
args.setRemoteTableId(remoteTableId);
args.setData(data);
+ args.setCredentials(credentials);
args.write(prot);
prot.writeMessageEnd();
}
@@ -191,9 +196,9 @@ import org.slf4j.LoggerFactory;
}
}
- public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException {
+ public void replicateKeyValues(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler) throws org.apache.thrift.TException {
checkReady();
- replicateKeyValues_call method_call = new replicateKeyValues_call(remoteTableId, data, resultHandler, this, ___protocolFactory, ___transport);
+ replicateKeyValues_call method_call = new replicateKeyValues_call(remoteTableId, data, credentials, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
@@ -201,10 +206,12 @@ import org.slf4j.LoggerFactory;
public static class replicateKeyValues_call extends org.apache.thrift.async.TAsyncMethodCall {
private int remoteTableId;
private KeyValues data;
- public replicateKeyValues_call(int remoteTableId, KeyValues data, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private org.apache.accumulo.core.security.thrift.TCredentials credentials;
+ public replicateKeyValues_call(int remoteTableId, KeyValues data, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<replicateKeyValues_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.remoteTableId = remoteTableId;
this.data = data;
+ this.credentials = credentials;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -212,6 +219,7 @@ import org.slf4j.LoggerFactory;
replicateKeyValues_args args = new replicateKeyValues_args();
args.setRemoteTableId(remoteTableId);
args.setData(data);
+ args.setCredentials(credentials);
args.write(prot);
prot.writeMessageEnd();
}
@@ -260,7 +268,7 @@ import org.slf4j.LoggerFactory;
public replicateLog_result getResult(I iface, replicateLog_args args) throws org.apache.thrift.TException {
replicateLog_result result = new replicateLog_result();
try {
- result.success = iface.replicateLog(args.remoteTableId, args.data);
+ result.success = iface.replicateLog(args.remoteTableId, args.data, args.credentials);
result.setSuccessIsSet(true);
} catch (RemoteReplicationException e) {
result.e = e;
@@ -285,7 +293,7 @@ import org.slf4j.LoggerFactory;
public replicateKeyValues_result getResult(I iface, replicateKeyValues_args args) throws org.apache.thrift.TException {
replicateKeyValues_result result = new replicateKeyValues_result();
try {
- result.success = iface.replicateKeyValues(args.remoteTableId, args.data);
+ result.success = iface.replicateKeyValues(args.remoteTableId, args.data, args.credentials);
result.setSuccessIsSet(true);
} catch (RemoteReplicationException e) {
result.e = e;
@@ -301,6 +309,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField REMOTE_TABLE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("remoteTableId", org.apache.thrift.protocol.TType.I32, (short)1);
private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+ private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)3);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -310,11 +319,13 @@ import org.slf4j.LoggerFactory;
public int remoteTableId; // required
public WalEdits data; // required
+ public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
REMOTE_TABLE_ID((short)1, "remoteTableId"),
- DATA((short)2, "data");
+ DATA((short)2, "data"),
+ CREDENTIALS((short)3, "credentials");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -333,6 +344,8 @@ import org.slf4j.LoggerFactory;
return REMOTE_TABLE_ID;
case 2: // DATA
return DATA;
+ case 3: // CREDENTIALS
+ return CREDENTIALS;
default:
return null;
}
@@ -382,6 +395,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, WalEdits.class)));
+ tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(replicateLog_args.class, metaDataMap);
}
@@ -391,12 +406,14 @@ import org.slf4j.LoggerFactory;
public replicateLog_args(
int remoteTableId,
- WalEdits data)
+ WalEdits data,
+ org.apache.accumulo.core.security.thrift.TCredentials credentials)
{
this();
this.remoteTableId = remoteTableId;
setRemoteTableIdIsSet(true);
this.data = data;
+ this.credentials = credentials;
}
/**
@@ -408,6 +425,9 @@ import org.slf4j.LoggerFactory;
if (other.isSetData()) {
this.data = new WalEdits(other.data);
}
+ if (other.isSetCredentials()) {
+ this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials);
+ }
}
public replicateLog_args deepCopy() {
@@ -419,6 +439,7 @@ import org.slf4j.LoggerFactory;
setRemoteTableIdIsSet(false);
this.remoteTableId = 0;
this.data = null;
+ this.credentials = null;
}
public int getRemoteTableId() {
@@ -468,6 +489,30 @@ import org.slf4j.LoggerFactory;
}
}
+ public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() {
+ return this.credentials;
+ }
+
+ public replicateLog_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) {
+ this.credentials = credentials;
+ return this;
+ }
+
+ public void unsetCredentials() {
+ this.credentials = null;
+ }
+
+ /** Returns true if field credentials is set (has been assigned a value) and false otherwise */
+ public boolean isSetCredentials() {
+ return this.credentials != null;
+ }
+
+ public void setCredentialsIsSet(boolean value) {
+ if (!value) {
+ this.credentials = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case REMOTE_TABLE_ID:
@@ -486,6 +531,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case CREDENTIALS:
+ if (value == null) {
+ unsetCredentials();
+ } else {
+ setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value);
+ }
+ break;
+
}
}
@@ -497,6 +550,9 @@ import org.slf4j.LoggerFactory;
case DATA:
return getData();
+ case CREDENTIALS:
+ return getCredentials();
+
}
throw new IllegalStateException();
}
@@ -512,6 +568,8 @@ import org.slf4j.LoggerFactory;
return isSetRemoteTableId();
case DATA:
return isSetData();
+ case CREDENTIALS:
+ return isSetCredentials();
}
throw new IllegalStateException();
}
@@ -547,6 +605,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_credentials = true && this.isSetCredentials();
+ boolean that_present_credentials = true && that.isSetCredentials();
+ if (this_present_credentials || that_present_credentials) {
+ if (!(this_present_credentials && that_present_credentials))
+ return false;
+ if (!this.credentials.equals(that.credentials))
+ return false;
+ }
+
return true;
}
@@ -583,6 +650,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(typedOther.isSetCredentials());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCredentials()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, typedOther.credentials);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -614,6 +691,14 @@ import org.slf4j.LoggerFactory;
sb.append(this.data);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("credentials:");
+ if (this.credentials == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.credentials);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -624,6 +709,9 @@ import org.slf4j.LoggerFactory;
if (data != null) {
data.validate();
}
+ if (credentials != null) {
+ credentials.validate();
+ }
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -679,6 +767,15 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 3: // CREDENTIALS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+ struct.credentials.read(iprot);
+ struct.setCredentialsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -702,6 +799,11 @@ import org.slf4j.LoggerFactory;
struct.data.write(oprot);
oprot.writeFieldEnd();
}
+ if (struct.credentials != null) {
+ oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC);
+ struct.credentials.write(oprot);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -726,19 +828,25 @@ import org.slf4j.LoggerFactory;
if (struct.isSetData()) {
optionals.set(1);
}
- oprot.writeBitSet(optionals, 2);
+ if (struct.isSetCredentials()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
if (struct.isSetRemoteTableId()) {
oprot.writeI32(struct.remoteTableId);
}
if (struct.isSetData()) {
struct.data.write(oprot);
}
+ if (struct.isSetCredentials()) {
+ struct.credentials.write(oprot);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, replicateLog_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(2);
+ BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
struct.remoteTableId = iprot.readI32();
struct.setRemoteTableIdIsSet(true);
@@ -748,6 +856,11 @@ import org.slf4j.LoggerFactory;
struct.data.read(iprot);
struct.setDataIsSet(true);
}
+ if (incoming.get(2)) {
+ struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+ struct.credentials.read(iprot);
+ struct.setCredentialsIsSet(true);
+ }
}
}
@@ -1214,6 +1327,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField REMOTE_TABLE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("remoteTableId", org.apache.thrift.protocol.TType.I32, (short)1);
private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRUCT, (short)2);
+ private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)3);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -1223,11 +1337,13 @@ import org.slf4j.LoggerFactory;
public int remoteTableId; // required
public KeyValues data; // required
+ public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
REMOTE_TABLE_ID((short)1, "remoteTableId"),
- DATA((short)2, "data");
+ DATA((short)2, "data"),
+ CREDENTIALS((short)3, "credentials");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -1246,6 +1362,8 @@ import org.slf4j.LoggerFactory;
return REMOTE_TABLE_ID;
case 2: // DATA
return DATA;
+ case 3: // CREDENTIALS
+ return CREDENTIALS;
default:
return null;
}
@@ -1295,6 +1413,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, KeyValues.class)));
+ tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(replicateKeyValues_args.class, metaDataMap);
}
@@ -1304,12 +1424,14 @@ import org.slf4j.LoggerFactory;
public replicateKeyValues_args(
int remoteTableId,
- KeyValues data)
+ KeyValues data,
+ org.apache.accumulo.core.security.thrift.TCredentials credentials)
{
this();
this.remoteTableId = remoteTableId;
setRemoteTableIdIsSet(true);
this.data = data;
+ this.credentials = credentials;
}
/**
@@ -1321,6 +1443,9 @@ import org.slf4j.LoggerFactory;
if (other.isSetData()) {
this.data = new KeyValues(other.data);
}
+ if (other.isSetCredentials()) {
+ this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials);
+ }
}
public replicateKeyValues_args deepCopy() {
@@ -1332,6 +1457,7 @@ import org.slf4j.LoggerFactory;
setRemoteTableIdIsSet(false);
this.remoteTableId = 0;
this.data = null;
+ this.credentials = null;
}
public int getRemoteTableId() {
@@ -1381,6 +1507,30 @@ import org.slf4j.LoggerFactory;
}
}
+ public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() {
+ return this.credentials;
+ }
+
+ public replicateKeyValues_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) {
+ this.credentials = credentials;
+ return this;
+ }
+
+ public void unsetCredentials() {
+ this.credentials = null;
+ }
+
+ /** Returns true if field credentials is set (has been assigned a value) and false otherwise */
+ public boolean isSetCredentials() {
+ return this.credentials != null;
+ }
+
+ public void setCredentialsIsSet(boolean value) {
+ if (!value) {
+ this.credentials = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case REMOTE_TABLE_ID:
@@ -1399,6 +1549,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case CREDENTIALS:
+ if (value == null) {
+ unsetCredentials();
+ } else {
+ setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value);
+ }
+ break;
+
}
}
@@ -1410,6 +1568,9 @@ import org.slf4j.LoggerFactory;
case DATA:
return getData();
+ case CREDENTIALS:
+ return getCredentials();
+
}
throw new IllegalStateException();
}
@@ -1425,6 +1586,8 @@ import org.slf4j.LoggerFactory;
return isSetRemoteTableId();
case DATA:
return isSetData();
+ case CREDENTIALS:
+ return isSetCredentials();
}
throw new IllegalStateException();
}
@@ -1460,6 +1623,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_credentials = true && this.isSetCredentials();
+ boolean that_present_credentials = true && that.isSetCredentials();
+ if (this_present_credentials || that_present_credentials) {
+ if (!(this_present_credentials && that_present_credentials))
+ return false;
+ if (!this.credentials.equals(that.credentials))
+ return false;
+ }
+
return true;
}
@@ -1496,6 +1668,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(typedOther.isSetCredentials());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCredentials()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, typedOther.credentials);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1527,6 +1709,14 @@ import org.slf4j.LoggerFactory;
sb.append(this.data);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("credentials:");
+ if (this.credentials == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.credentials);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -1537,6 +1727,9 @@ import org.slf4j.LoggerFactory;
if (data != null) {
data.validate();
}
+ if (credentials != null) {
+ credentials.validate();
+ }
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -1592,6 +1785,15 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 3: // CREDENTIALS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+ struct.credentials.read(iprot);
+ struct.setCredentialsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1615,6 +1817,11 @@ import org.slf4j.LoggerFactory;
struct.data.write(oprot);
oprot.writeFieldEnd();
}
+ if (struct.credentials != null) {
+ oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC);
+ struct.credentials.write(oprot);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1639,19 +1846,25 @@ import org.slf4j.LoggerFactory;
if (struct.isSetData()) {
optionals.set(1);
}
- oprot.writeBitSet(optionals, 2);
+ if (struct.isSetCredentials()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
if (struct.isSetRemoteTableId()) {
oprot.writeI32(struct.remoteTableId);
}
if (struct.isSetData()) {
struct.data.write(oprot);
}
+ if (struct.isSetCredentials()) {
+ struct.credentials.write(oprot);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, replicateKeyValues_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(2);
+ BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
struct.remoteTableId = iprot.readI32();
struct.setRemoteTableIdIsSet(true);
@@ -1661,6 +1874,11 @@ import org.slf4j.LoggerFactory;
struct.data.read(iprot);
struct.setDataIsSet(true);
}
+ if (incoming.get(2)) {
+ struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+ struct.credentials.read(iprot);
+ struct.setCredentialsIsSet(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/thrift/replication.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/replication.thrift b/core/src/main/thrift/replication.thrift
index a5d1836..392e913 100644
--- a/core/src/main/thrift/replication.thrift
+++ b/core/src/main/thrift/replication.thrift
@@ -19,6 +19,7 @@ namespace java org.apache.accumulo.core.replication.thrift
namespace cpp org.apache.accumulo.core.replication.thrift
include "data.thrift"
+include "security.thrift"
struct WalEdits {
1:list<binary> edits
@@ -28,21 +29,35 @@ struct KeyValues {
1:list<data.TKeyValue> keyValues
}
-exception RemoteCoordinationException {
- 1:i32 code,
+enum RemoteReplicationErrorCode {
+ COULD_NOT_DESERIALIZE
+ COULD_NOT_APPLY
+ TABLE_DOES_NOT_EXIST
+ CANNOT_AUTHENTICATE
+ CANNOT_INSTANTIATE_REPLAYER
+}
+
+enum ReplicationCoordinatorErrorCode {
+ NO_AVAILABLE_SERVERS
+ SERVICE_CONFIGURATION_UNAVAILABLE
+ CANNOT_AUTHENTICATE
+}
+
+exception ReplicationCoordinatorException {
+ 1:ReplicationCoordinatorErrorCode code,
2:string reason
}
exception RemoteReplicationException {
- 1:i32 code,
+ 1:RemoteReplicationErrorCode code,
2:string reason
}
service ReplicationCoordinator {
- string getServicerAddress(1:i32 remoteTableId) throws (1:RemoteCoordinationException e),
+ string getServicerAddress(1:i32 remoteTableId, 2:security.TCredentials credentials) throws (1:ReplicationCoordinatorException e),
}
service ReplicationServicer {
- i64 replicateLog(1:i32 remoteTableId, 2:WalEdits data) throws (1:RemoteReplicationException e),
- i64 replicateKeyValues(1:i32 remoteTableId, 2:KeyValues data) throws (1:RemoteReplicationException e)
+ i64 replicateLog(1:i32 remoteTableId, 2:WalEdits data, 3:security.TCredentials credentials) throws (1:RemoteReplicationException e),
+ i64 replicateKeyValues(1:i32 remoteTableId, 2:KeyValues data, 3:security.TCredentials credentials) throws (1:RemoteReplicationException e)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
index 9331075..974aaa9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/MasterReplicationCoordinator.java
@@ -23,13 +23,17 @@ import java.util.Set;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.replication.ReplicationCoordinatorErrorCode;
-import org.apache.accumulo.core.replication.thrift.RemoteCoordinationException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorErrorCode;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinatorException;
+import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.security.SecurityOperation;
+import org.apache.accumulo.server.security.SystemCredentials;
import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -47,6 +51,7 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
private final Instance inst;
private final Random rand;
private final ZooReader reader;
+ private final SecurityOperation security;
public MasterReplicationCoordinator(Master master) {
this(master, new ZooReader(master.getInstance().getZooKeepers(), master.getInstance().getZooKeepersSessionTimeOut()));
@@ -57,15 +62,22 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
this.rand = new Random(358923462l);
this.inst = master.getInstance();
this.reader = reader;
-
+ this.security = SecurityOperation.getInstance(inst.getInstanceID(), false);
}
@Override
- public String getServicerAddress(int remoteTableId) throws RemoteCoordinationException, TException {
+ public String getServicerAddress(int remoteTableId, TCredentials creds) throws ReplicationCoordinatorException, TException {
+ try {
+ security.authenticateUser(SystemCredentials.get().toThrift(inst), creds);
+ } catch (ThriftSecurityException e) {
+ log.error("{} failed to authenticate for replication to {}", creds.getPrincipal(), remoteTableId);
+ throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.CANNOT_AUTHENTICATE, "Could not authenticate " + creds.getPrincipal());
+ }
+
Set<TServerInstance> tservers = master.onlineTabletServers();
if (tservers.isEmpty()) {
- throw new RemoteCoordinationException(ReplicationCoordinatorErrorCode.NO_AVAILABLE_SERVERS.ordinal(), "No tservers are available for replication");
+ throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.NO_AVAILABLE_SERVERS, "No tservers are available for replication");
}
TServerInstance tserver = getRandomTServer(tservers, rand.nextInt(tservers.size()));
@@ -74,7 +86,7 @@ public class MasterReplicationCoordinator implements ReplicationCoordinator.Ifac
replServiceAddr = new String(reader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver.hostPort(), null), StandardCharsets.UTF_8);
} catch (KeeperException | InterruptedException e) {
log.error("Could not fetch repliation service port for tserver", e);
- throw new RemoteCoordinationException(ReplicationCoordinatorErrorCode.SERVICE_CONFIGURATION_UNAVAILABLE.ordinal(),
+ throw new ReplicationCoordinatorException(ReplicationCoordinatorErrorCode.SERVICE_CONFIGURATION_UNAVAILABLE,
"Could not determine port for replication service running at " + tserver.hostPort());
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index ca1382f..c6b266f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.AccumuloException;
@@ -34,6 +35,7 @@ import org.apache.accumulo.core.client.impl.ClientExecReturn;
import org.apache.accumulo.core.client.impl.ReplicationClient;
import org.apache.accumulo.core.client.impl.ServerConfigurationUtil;
import org.apache.accumulo.core.client.replication.ReplicaSystem;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
@@ -45,6 +47,8 @@ import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Client;
import org.apache.accumulo.core.replication.thrift.WalEdits;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.accumulo.server.conf.ServerConfiguration;
@@ -137,8 +141,10 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
@Override
public Status replicate(final Path p, final Status status, final ReplicationTarget target) {
- Instance localInstance = HdfsZooInstance.getInstance();
- AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance);
+ final Instance localInstance = HdfsZooInstance.getInstance();
+ final AccumuloConfiguration localConf = ServerConfigurationUtil.getConfiguration(localInstance);
+ Credentials credentialsForPeer = getCredentialsForPeer(localConf, target);
+ final TCredentials tCredsForPeer = credentialsForPeer.toThrift(localInstance);
Instance peerInstance = getPeerInstance(target);
// Remote identifier is an integer (table id) in this case.
@@ -154,7 +160,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
@Override
public String execute(ReplicationCoordinator.Client client) throws Exception {
- return client.getServicerAddress(remoteTableId);
+ return client.getServicerAddress(remoteTableId, tCredsForPeer);
}
});
@@ -184,7 +190,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
if (p.getName().endsWith(RFILE_SUFFIX)) {
RFileReplication kvs = getKeyValues(target, p, status, sizeLimit);
if (0 < kvs.keyValues.getKeyValuesSize()) {
- long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues);
+ long entriesReplicated = client.replicateKeyValues(remoteTableId, kvs.keyValues, tCredsForPeer);
if (entriesReplicated != kvs.keyValues.getKeyValuesSize()) {
log.warn("Sent {} KeyValue entries for replication but only {} were reported as replicated", kvs.keyValues.getKeyValuesSize(),
entriesReplicated);
@@ -198,7 +204,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
// If we have some edits to send
if (0 < edits.walEdits.getEditsSize()) {
- long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits);
+ long entriesReplicated = client.replicateLog(remoteTableId, edits.walEdits, tCredsForPeer);
if (entriesReplicated != edits.numUpdates) {
log.warn("Sent {} WAL entries for replication but {} were reported as replicated", edits.numUpdates, entriesReplicated);
}
@@ -241,6 +247,24 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
return status;
}
+ protected Credentials getCredentialsForPeer(AccumuloConfiguration conf, ReplicationTarget target) {
+ Preconditions.checkNotNull(conf);
+ Preconditions.checkNotNull(target);
+
+ String peerName = target.getPeerName();
+ String userKey = Property.REPLICATION_PEER_USER.getKey() + peerName, passwordKey = Property.REPLICATION_PEER_PASSWORD.getKey() + peerName;
+ Map<String,String> peerUsers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER);
+ Map<String,String> peerPasswords = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD);
+
+ String user = peerUsers.get(userKey);
+ String password = peerPasswords.get(passwordKey);
+ if (null == user || null == password) {
+ throw new IllegalArgumentException(userKey + " and " + passwordKey + " not configured, cannot replicate");
+ }
+
+ return new Credentials(user, new PasswordToken(password));
+ }
+
protected Instance getPeerInstance(ReplicationTarget target) {
return new ZooKeeperInstance(instanceName, zookeepers);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
index ea50199..8b1a402 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayer.java
@@ -29,8 +29,8 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
-import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
import org.apache.accumulo.core.replication.thrift.KeyValues;
+import org.apache.accumulo.core.replication.thrift.RemoteReplicationErrorCode;
import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
import org.apache.accumulo.core.replication.thrift.WalEdits;
import org.apache.accumulo.server.client.HdfsZooInstance;
@@ -64,7 +64,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
value.readFields(dis);
} catch (IOException e) {
log.error("Could not deserialize edit from stream", e);
- throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE.ordinal(), "Could not deserialize edit from stream");
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_DESERIALIZE, "Could not deserialize edit from stream");
}
// Create the batchScanner if we don't already have one.
@@ -74,7 +74,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
try {
bw = conn.createBatchWriter(tableName, bwConfig);
} catch (TableNotFoundException e) {
- throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table " + tableName + " does not exist");
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST, "Table " + tableName + " does not exist");
}
}
@@ -84,7 +84,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
bw.addMutations(value.mutations);
} catch (MutationsRejectedException e) {
log.error("Could not apply mutations to {}", tableName);
- throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + tableName);
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY, "Could not apply mutations to " + tableName);
}
mutationsApplied += value.mutations.size();
@@ -95,7 +95,7 @@ public class BatchWriterReplicationReplayer implements AccumuloReplicationReplay
bw.close();
} catch (MutationsRejectedException e) {
log.error("Could not apply mutations to {}", tableName);
- throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY.ordinal(), "Could not apply mutations to " + tableName);
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.COULD_NOT_APPLY, "Could not apply mutations to " + tableName);
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
index 820c586..3a9bf9b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationServicerHandler.java
@@ -28,13 +28,13 @@ import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.replication.AccumuloReplicationReplayer;
-import org.apache.accumulo.core.replication.RemoteReplicationErrorCode;
import org.apache.accumulo.core.replication.thrift.KeyValues;
+import org.apache.accumulo.core.replication.thrift.RemoteReplicationErrorCode;
import org.apache.accumulo.core.replication.thrift.RemoteReplicationException;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer.Iface;
import org.apache.accumulo.core.replication.thrift.WalEdits;
import org.apache.accumulo.core.security.Credentials;
-import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,11 +52,11 @@ public class ReplicationServicerHandler implements Iface {
}
@Override
- public long replicateLog(int remoteTableId, WalEdits data) throws RemoteReplicationException, TException {
+ public long replicateLog(int remoteTableId, WalEdits data, TCredentials tcreds) throws RemoteReplicationException, TException {
log.debug("Got replication request to tableID {} with {} edits", remoteTableId, data.getEditsSize());
String tableId = Integer.toString(remoteTableId);
- Credentials creds = SystemCredentials.get();
+ Credentials creds = Credentials.fromThrift(tcreds);
Connector conn;
String tableName;
@@ -64,14 +64,14 @@ public class ReplicationServicerHandler implements Iface {
conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
} catch (AccumuloException | AccumuloSecurityException e) {
log.error("Could not get connection", e);
- throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE.ordinal(), "Cannot get connector");
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_AUTHENTICATE, "Cannot get connector as " + creds.getPrincipal());
}
try {
tableName = Tables.getTableName(inst, tableId);
} catch (TableNotFoundException e) {
log.error("Could not find table with id {}", tableId);
- throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST.ordinal(), "Table with id " + tableId + " does not exist");
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.TABLE_DOES_NOT_EXIST, "Table with id " + tableId + " does not exist");
}
AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(inst);
@@ -96,7 +96,7 @@ public class ReplicationServicerHandler implements Iface {
clz = untypedClz.asSubclass(AccumuloReplicationReplayer.class);
} catch (ClassNotFoundException e) {
log.error("Could not instantiate replayer class {}", handlerClassForTable, e);
- throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class "
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class "
+ handlerClassForTable);
}
@@ -106,7 +106,7 @@ public class ReplicationServicerHandler implements Iface {
replayer = clz.newInstance();
} catch (InstantiationException | IllegalAccessException e1) {
log.error("Could not instantiate replayer class {}", clz.getName());
- throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER.ordinal(), "Could not instantiate replayer class"
+ throw new RemoteReplicationException(RemoteReplicationErrorCode.CANNOT_INSTANTIATE_REPLAYER, "Could not instantiate replayer class"
+ clz.getName());
}
@@ -116,7 +116,7 @@ public class ReplicationServicerHandler implements Iface {
}
@Override
- public long replicateKeyValues(int remoteTableId, KeyValues data) throws RemoteReplicationException, TException {
+ public long replicateKeyValues(int remoteTableId, KeyValues data, TCredentials creds) throws RemoteReplicationException, TException {
throw new UnsupportedOperationException();
}
[4/6] git commit: ACCUMULO-378 Make the MockReplicaSystem a bit more
honest and only set full replication when closed and inf length
Posted by el...@apache.org.
ACCUMULO-378 Make the MockReplicaSystem a bit more honest and only set full replication when closed and inf length
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3b727cf9
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3b727cf9
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3b727cf9
Branch: refs/heads/ACCUMULO-378
Commit: 3b727cf94f4341d39adf07dcbd62361b7d4a0de3
Parents: 3605275
Author: Josh Elser <el...@apache.org>
Authored: Mon May 26 13:48:21 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 26 13:48:21 2014 -0400
----------------------------------------------------------------------
.../test/replication/MockReplicaSystem.java | 25 +++++++++++++++-----
1 file changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3b727cf9/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
index fa6ca2f..ac44f97 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/MockReplicaSystem.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.test.replication;
import org.apache.accumulo.core.client.replication.ReplicaSystem;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +38,18 @@ public class MockReplicaSystem implements ReplicaSystem {
@Override
public Status replicate(Path p, Status status, ReplicationTarget target) {
- Status.Builder builder = Status.newBuilder(status);
- if (status.getInfiniteEnd()) {
- builder.setBegin(Long.MAX_VALUE);
+ Status newStatus;
+ if (status.getClosed() && status.getInfiniteEnd()) {
+ Status.Builder builder = Status.newBuilder(status);
+ if (status.getInfiniteEnd()) {
+ builder.setBegin(Long.MAX_VALUE);
+ } else {
+ builder.setBegin(status.getEnd());
+ }
+ newStatus = builder.build();
} else {
- builder.setBegin(status.getEnd());
+ log.info("{} with status {} is not closed and with infinite length, ignoring");
+ newStatus = status;
}
try {
@@ -52,13 +60,18 @@ public class MockReplicaSystem implements ReplicaSystem {
return status;
}
- Status newStatus = builder.build();
- log.info("Received {} returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
+ log.info("Received {}, returned {}", TextFormat.shortDebugString(status), TextFormat.shortDebugString(newStatus));
+
return newStatus;
}
@Override
public void configure(String configuration) {
+ if (StringUtils.isBlank(configuration)) {
+ log.debug("No configuration, using default sleep of {}", sleep);
+ return;
+ }
+
try {
sleep = Long.parseLong(configuration);
} catch (NumberFormatException e) {
[6/6] git commit: ACCUMULO-378 Test class consolidation
Posted by el...@apache.org.
ACCUMULO-378 Test class consolidation
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9d9b5ed2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9d9b5ed2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9d9b5ed2
Branch: refs/heads/ACCUMULO-378
Commit: 9d9b5ed24f3e425459108a993ab2cea121d1b612
Parents: 3b727cf
Author: Josh Elser <el...@apache.org>
Authored: Mon May 26 13:48:55 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Mon May 26 13:48:55 2014 -0400
----------------------------------------------------------------------
.../replication/SequentialWorkAssigner.java | 2 +-
.../replication/ReplicationProcessor.java | 4 +
.../replication/MultiTserverReplicationIT.java | 113 ++
.../replication/ReplicationDeadlockTest.java | 170 ---
.../ReplicationFilesClosedAfterUnusedTest.java | 172 ---
.../test/replication/ReplicationIT.java | 338 +++++-
.../ReplicationPortAdvertisementIT.java | 113 --
.../replication/ReplicationSequentialIT.java | 402 -------
.../replication/ReplicationSourceOnlyIT.java | 208 ----
.../replication/ReplicationTablesMacTest.java | 90 --
.../test/replication/ReplicationTest.java | 1135 +++++++++++++++++-
.../test/replication/ReplicationWithGCIT.java | 554 ---------
.../replication/ReplicationWithMakerTest.java | 337 ------
13 files changed, 1588 insertions(+), 2050 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
index f2d110a..af43d7d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/SequentialWorkAssigner.java
@@ -297,7 +297,7 @@ public class SequentialWorkAssigner extends AbstractWorkAssigner {
log.debug("Not queueing {} for work as {} must be replicated to {} first", file, keyBeingReplicated, target.getPeerName());
}
} else {
- log.debug("Not queueing work for {} because {} doesn't need replication", file, ProtobufUtil.toString(status));
+ log.debug("Not queueing work for {} to {} because {} doesn't need replication", file, target, ProtobufUtil.toString(status));
if (key.equals(keyBeingReplicated)) {
log.debug("Removing {} from replication state to {} because replication is complete", keyBeingReplicated, target.getPeerName());
queuedWorkForPeer.remove(sourceTableId);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index f6fe91f..50c79d6 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -18,6 +18,7 @@ package org.apache.accumulo.tserver.replication;
import java.io.IOException;
import java.util.Map;
+import java.util.NoSuchElementException;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -96,6 +97,9 @@ public class ReplicationProcessor implements Processor {
} catch (InvalidProtocolBufferException e) {
log.error("Could not deserialize Status from Work section for {} and ", file, target);
throw new RuntimeException("Could not parse Status for work record", e);
+ } catch (NoSuchElementException e) {
+ log.error("Assigned work for {} to {} but could not find work record", file, target);
+ return;
}
log.debug("Current status for {} replicating to {}: {}", file, target, ProtobufUtil.toString(status));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
new file mode 100644
index 0000000..96e8b52
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/replication/MultiTserverReplicationIT.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+import com.google.common.net.HostAndPort;
+
+/**
+ *
+ */
+public class MultiTserverReplicationIT extends ConfigurableMacIT {
+ private static final Logger log = LoggerFactory.getLogger(MultiTserverReplicationIT.class);
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setNumTservers(2);
+ }
+
+ @Test
+ public void tserverReplicationServicePortsAreAdvertised() throws Exception {
+ // Wait for the cluster to be up
+ Connector conn = getConnector();
+ Instance inst = conn.getInstance();
+
+ // Wait for a tserver to come up to fulfill this request
+ conn.tableOperations().create("foo");
+ Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+ Set<String> tserverHost = new HashSet<>();
+ tserverHost.addAll(zreader.getChildren(ZooUtil.getRoot(inst) + Constants.ZTSERVERS));
+
+ Set<HostAndPort> replicationServices = new HashSet<>();
+
+ for (String tserver : tserverHost) {
+ try {
+ byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver, null);
+ HostAndPort replAddress = HostAndPort.fromString(new String(portData, StandardCharsets.UTF_8));
+ replicationServices.add(replAddress);
+ } catch (Exception e) {
+ log.error("Could not find port for {}", tserver, e);
+ Assert.fail("Did not find replication port advertisement for " + tserver);
+ }
+ }
+
+ // Each tserver should also have equial replicaiton services running internally
+ Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size());
+ }
+
+ @Test
+ public void masterReplicationServicePortsAreAdvertised() throws Exception {
+ // Wait for the cluster to be up
+ Connector conn = getConnector();
+ Instance inst = conn.getInstance();
+
+ // Wait for a tserver to come up to fulfill this request
+ conn.tableOperations().create("foo");
+ Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
+ Assert.assertEquals(0, Iterables.size(s));
+
+ ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
+
+ // Should have one master instance
+ Assert.assertEquals(1, inst.getMasterLocations().size());
+
+ // Get the master thrift service addr
+ String masterAddr = Iterables.getOnlyElement(inst.getMasterLocations());
+
+ // Get the master replication coordinator addr
+ String replCoordAddr = new String(zreader.getData(ZooUtil.getRoot(inst) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, null), StandardCharsets.UTF_8);
+
+ // They shouldn't be the same
+ Assert.assertNotEquals(masterAddr, replCoordAddr);
+
+ // Neither should be zero as the port
+ Assert.assertNotEquals(0, HostAndPort.fromString(masterAddr).getPort());
+ Assert.assertNotEquals(0, HostAndPort.fromString(replCoordAddr).getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
deleted file mode 100644
index 418d717..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationDeadlockTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Test;
-
-/**
- *
- */
-public class ReplicationDeadlockTest extends ConfigurableMacIT {
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- private Set<String> metadataWals(Connector conn) throws Exception {
- Scanner s = conn.createScanner(MetadataTable.NAME, new Authorizations());
- s.fetchColumnFamily(LogColumnFamily.NAME);
- Set<String> metadataWals = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String log : logEntry.logSet) {
- metadataWals.add(new Path(log).toString());
- }
- }
- return metadataWals;
- }
-
- @Test(timeout = 60 * 1000)
- public void noDeadlock() throws Exception {
- final Connector conn = getConnector();
-
- if (conn.tableOperations().exists(ReplicationTable.NAME)) {
- conn.tableOperations().delete(ReplicationTable.NAME);
- }
-
- ReplicationTable.create(conn);
- conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
- final AtomicBoolean keepRunning = new AtomicBoolean(true);
- final Set<String> metadataWals = new HashSet<>();
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- // Should really be able to interrupt here, but the Scanner throws a fit to the logger
- // when that happens
- while (keepRunning.get()) {
- try {
- metadataWals.addAll(metadataWals(conn));
- } catch (Exception e) {
- log.error("Metadata table doesn't exist");
- }
- }
- }
-
- });
-
- t.start();
-
- String table1 = "table1", table2 = "table2", table3 = "table3";
-
- conn.tableOperations().create(table1);
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- conn.tableOperations().create(table2);
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- conn.tableOperations().create(table3);
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Flush everything to try to make the replication records
- for (String table : Arrays.asList(table1, table2, table3)) {
- conn.tableOperations().flush(table, null, null, true);
- }
-
- keepRunning.set(false);
- t.join(5000);
-
- for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
- Scanner s = conn.createScanner(table, new Authorizations());
- for (@SuppressWarnings("unused")
- Entry<Key,Value> entry : s) {}
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
deleted file mode 100644
index eb89317..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationFilesClosedAfterUnusedTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.HashSet;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- */
-public class ReplicationFilesClosedAfterUnusedTest extends ConfigurableMacIT {
- private static final Logger log = LoggerFactory.getLogger(ReplicationFilesClosedAfterUnusedTest.class);
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0s");
- cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "0s");
- cfg.setProperty(Property.REPLICATION_NAME, "master");
- cfg.setNumTservers(1);
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- @Test(timeout = 60000)
- public void test() throws Exception {
- Connector conn = getConnector();
-
- String table = "table";
- conn.tableOperations().create(table);
- String tableId = conn.tableOperations().tableIdMap().get(table);
-
- Assert.assertNotNull(tableId);
-
- log.info("Writing to {}", tableId);
-
- conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- // just sleep
- conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
- ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
-
- // Write a mutation to make a log file
- BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
- Mutation m = new Mutation("one");
- m.put("", "", "");
- bw.addMutation(m);
- bw.close();
-
- // Write another to make sure the logger rolls itself?
- bw = conn.createBatchWriter(table, new BatchWriterConfig());
- m = new Mutation("three");
- m.put("", "", "");
- bw.addMutation(m);
- bw.close();
-
- Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
- s.setRange(TabletsSection.getRange(tableId));
- Set<String> wals = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String file : logEntry.logSet) {
- Path p = new Path(file);
- wals.add(p.toString());
- }
- }
-
- log.warn("Found wals {}", wals);
-
- // for (int j = 0; j < 5; j++) {
- bw = conn.createBatchWriter(table, new BatchWriterConfig());
- m = new Mutation("three");
- byte[] bytes = new byte[1024 * 1024];
- m.put("1".getBytes(), new byte[0], bytes);
- m.put("2".getBytes(), new byte[0], bytes);
- m.put("3".getBytes(), new byte[0], bytes);
- m.put("4".getBytes(), new byte[0], bytes);
- m.put("5".getBytes(), new byte[0], bytes);
- bw.addMutation(m);
- bw.close();
-
- conn.tableOperations().flush(table, null, null, true);
-
- while (!conn.tableOperations().exists(ReplicationTable.NAME)) {
- UtilWaitThread.sleep(500);
- }
-
- for (int i = 0; i < 5; i++) {
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.fetchColumnFamily(LogColumnFamily.NAME);
- s.setRange(TabletsSection.getRange(tableId));
- for (Entry<Key,Value> entry : s) {
- log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
- }
-
- s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Text buff = new Text();
- boolean allReferencedLogsClosed = true;
- int recordsFound = 0;
- for (Entry<Key,Value> e : s) {
- recordsFound++;
- allReferencedLogsClosed = true;
- StatusSection.getFile(e.getKey(), buff);
- String file = buff.toString();
- if (wals.contains(file)) {
- Status stat = Status.parseFrom(e.getValue().get());
- if (!stat.getClosed()) {
- log.info("{} wasn't closed", file);
- allReferencedLogsClosed = false;
- }
- }
- }
-
- if (recordsFound > 0 && allReferencedLogsClosed) {
- return;
- }
-
- Thread.sleep(1000);
- }
-
- Assert.fail("We had a file that was referenced but didn't get closed");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index f34b626..db21586 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -19,6 +19,12 @@ package org.apache.accumulo.test.replication;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -31,13 +37,16 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.master.replication.SequentialWorkAssigner;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
@@ -48,7 +57,9 @@ import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +67,20 @@ import org.slf4j.LoggerFactory;
public class ReplicationIT extends ConfigurableMacIT {
private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class);
+ private ExecutorService executor;
+
+ @Before
+ public void createExecutor() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @After
+ public void stopExecutor() {
+ if (null != executor) {
+ executor.shutdownNow();
+ }
+ }
+
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setNumTservers(1);
@@ -66,6 +91,7 @@ public class ReplicationIT extends ConfigurableMacIT {
cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
cfg.setProperty(Property.REPLICATION_NAME, "master");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@@ -79,6 +105,314 @@ public class ReplicationIT extends ConfigurableMacIT {
peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+ peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+ MiniAccumuloClusterImpl peerCluster = peerCfg.build();
+
+ peerCluster.start();
+
+ try {
+ final Connector connMaster = getConnector();
+ final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
+
+ ReplicationTable.create(connMaster);
+
+ String peerUserName = "peer", peerPassword = "foo";
+
+ String peerClusterName = "peer";
+
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ connMaster.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + peerClusterName,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+
+ final String masterTable = "master", peerTable = "peer";
+
+ connMaster.tableOperations().create(masterTable);
+ String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
+ Assert.assertNotNull(masterTableId);
+
+ connPeer.tableOperations().create(peerTable);
+ String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
+ Assert.assertNotNull(peerTableId);
+
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
+
+ // Write some data to table1
+ BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
+ for (int rows = 0; rows < 5000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to master cluster");
+
+// log.debug("");
+// for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+// if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+// log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+// } else {
+// log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+// }
+// }
+
+ final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
+
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.exec(TabletServer.class);
+
+ log.info("TabletServer restarted");
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
+ log.info("TabletServer is online");
+
+ log.info("");
+ log.info("Fetching metadata records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ } else {
+ log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+ }
+ }
+
+ log.info("");
+ log.info("Fetching replication records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ }
+
+ Future<Boolean> future = executor.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
+ log.info("Drain completed");
+ return true;
+ }
+
+ });
+
+ try {
+ future.get(30, TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ future.cancel(true);
+ Assert.fail("Drain did not finish within 30 seconds");
+ }
+
+ log.info("drain completed");
+
+ log.info("");
+ log.info("Fetching metadata records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
+ if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ } else {
+ log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
+ }
+ }
+
+ log.info("");
+ log.info("Fetching replication records:");
+ for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
+ log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
+ }
+
+ Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
+ Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
+ Entry<Key,Value> masterEntry = null, peerEntry = null;
+ while (masterIter.hasNext() && peerIter.hasNext()) {
+ masterEntry = masterIter.next();
+ peerEntry = peerIter.next();
+ Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
+ masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
+ Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
+ }
+
+ log.info("Last master entry: " + masterEntry);
+ log.info("Last peer entry: " + peerEntry);
+
+ Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
+ Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
+ } finally {
+ peerCluster.stop();
+ }
+ }
+
+ @Test(timeout = 60 * 5000)
+ public void dataReplicatedToCorrectTable() throws Exception {
+ MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+ ROOT_PASSWORD);
+ peerCfg.setNumTservers(1);
+ peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+ peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+ peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+ peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
+ peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
+ MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
+
+ peer1Cluster.start();
+
+ try {
+ Connector connMaster = getConnector();
+ Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
+
+ String peerClusterName = "peer";
+ String peerUserName = "peer", peerPassword = "foo";
+
+ // Create local user
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
+ // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
+ connMaster.instanceOperations().setProperty(
+ Property.REPLICATION_PEERS.getKey() + peerClusterName,
+ ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
+ AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
+
+ String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+
+ // Create tables
+ connMaster.tableOperations().create(masterTable1);
+ String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
+ Assert.assertNotNull(masterTableId1);
+
+ connMaster.tableOperations().create(masterTable2);
+ String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
+ Assert.assertNotNull(masterTableId2);
+
+ connPeer.tableOperations().create(peerTable1);
+ String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
+ Assert.assertNotNull(peerTableId1);
+
+ connPeer.tableOperations().create(peerTable2);
+ String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
+ Assert.assertNotNull(peerTableId2);
+
+ // Grant write permission
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+
+ // Replicate this table to the peerClusterName in a table with the peerTableId table id
+ connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
+
+ connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2);
+
+ // Write some data to table1
+ BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
+ long masterTable1Records = 0l;
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(masterTable1 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ masterTable1Records++;
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Write some data to table2
+ bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
+ long masterTable2Records = 0l;
+ for (int rows = 0; rows < 2500; rows++) {
+ Mutation m = new Mutation(masterTable2 + rows);
+ for (int cols = 0; cols < 100; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ masterTable2Records++;
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ log.info("Wrote all data to master cluster");
+
+ Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
+ masterTable2);
+
+ while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
+ Thread.sleep(500);
+ }
+
+ // Restart the tserver to force a close on the WAL
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+ cluster.exec(TabletServer.class);
+
+ log.info("Restarted the tserver");
+
+ // Read the data -- the tserver is back up and running
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
+
+ // Wait for both tables to be replicated
+ log.info("Waiting for {} for {}", filesFor1, masterTable1);
+ connMaster.replicationOperations().drain(masterTable1, filesFor1);
+
+ log.info("Waiting for {} for {}", filesFor2, masterTable2);
+ connMaster.replicationOperations().drain(masterTable2, filesFor2);
+
+ long countTable = 0l;
+ for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(masterTable1));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable1);
+ Assert.assertEquals(masterTable1Records, countTable);
+
+ countTable = 0l;
+ for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
+ countTable++;
+ Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
+ .startsWith(masterTable2));
+ }
+
+ log.info("Found {} records in {}", countTable, peerTable2);
+ Assert.assertEquals(masterTable2Records, countTable);
+
+ } finally {
+ peer1Cluster.stop();
+ }
+ }
+
+ @Test(timeout = 60 * 5000)
+ public void dataWasReplicatedToThePeerWithoutDrain() throws Exception {
+ MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
+ ROOT_PASSWORD);
+ peerCfg.setNumTservers(1);
+ peerCfg.setInstanceName("peer");
+ peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
+ peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
+ peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
+ peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
MiniAccumuloClusterImpl peerCluster = peerCfg.build();
peerCluster.start();
@@ -169,7 +503,7 @@ public class ReplicationIT extends ConfigurableMacIT {
}
@Test(timeout = 60 * 5000)
- public void dataReplicatedToCorrectTable() throws Exception {
+ public void dataReplicatedToCorrectTableWithoutDrain() throws Exception {
MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
ROOT_PASSWORD);
peerCfg.setNumTservers(1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
deleted file mode 100644
index 0afbc05..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Iterables;
-import com.google.common.net.HostAndPort;
-
-/**
- *
- */
-public class ReplicationPortAdvertisementIT extends ConfigurableMacIT {
- private static final Logger log = LoggerFactory.getLogger(ReplicationPortAdvertisementIT.class);
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(2);
- }
-
- @Test
- public void tserverReplicationServicePortsAreAdvertised() throws Exception {
- // Wait for the cluster to be up
- Connector conn = getConnector();
- Instance inst = conn.getInstance();
-
- // Wait for a tserver to come up to fulfill this request
- conn.tableOperations().create("foo");
- Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
- Assert.assertEquals(0, Iterables.size(s));
-
- ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
- Set<String> tserverHost = new HashSet<>();
- tserverHost.addAll(zreader.getChildren(ZooUtil.getRoot(inst) + Constants.ZTSERVERS));
-
- Set<HostAndPort> replicationServices = new HashSet<>();
-
- for (String tserver : tserverHost) {
- try {
- byte[] portData = zreader.getData(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_TSERVERS + "/" + tserver, null);
- HostAndPort replAddress = HostAndPort.fromString(new String(portData, StandardCharsets.UTF_8));
- replicationServices.add(replAddress);
- } catch (Exception e) {
- log.error("Could not find port for {}", tserver, e);
- Assert.fail("Did not find replication port advertisement for " + tserver);
- }
- }
-
- // Each tserver should also have equial replicaiton services running internally
- Assert.assertEquals("Expected an equal number of replication servicers and tservers", tserverHost.size(), replicationServices.size());
- }
-
- @Test
- public void masterReplicationServicePortsAreAdvertised() throws Exception {
- // Wait for the cluster to be up
- Connector conn = getConnector();
- Instance inst = conn.getInstance();
-
- // Wait for a tserver to come up to fulfill this request
- conn.tableOperations().create("foo");
- Scanner s = conn.createScanner("foo", Authorizations.EMPTY);
- Assert.assertEquals(0, Iterables.size(s));
-
- ZooReader zreader = new ZooReader(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut());
-
- // Should have one master instance
- Assert.assertEquals(1, inst.getMasterLocations().size());
-
- // Get the master thrift service addr
- String masterAddr = Iterables.getOnlyElement(inst.getMasterLocations());
-
- // Get the master replication coordinator addr
- String replCoordAddr = new String(zreader.getData(ZooUtil.getRoot(inst) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR, null), StandardCharsets.UTF_8);
-
- // They shouldn't be the same
- Assert.assertNotEquals(masterAddr, replCoordAddr);
-
- // Neither should be zero as the port
- Assert.assertNotEquals(0, HostAndPort.fromString(masterAddr).getPort());
- Assert.assertNotEquals(0, HostAndPort.fromString(replCoordAddr).getPort());
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
deleted file mode 100644
index c7c36e8..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ /dev/null
@@ -1,402 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.PartialKey;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.master.replication.SequentialWorkAssigner;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.tserver.TabletServer;
-import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReplicationSequentialIT extends ConfigurableMacIT {
- private static final Logger log = LoggerFactory.getLogger(ReplicationSequentialIT.class);
-
- private ExecutorService executor;
-
- @Before
- public void createExecutor() {
- executor = Executors.newSingleThreadExecutor();
- }
-
- @After
- public void stopExecutor() {
- if (null != executor) {
- executor.shutdownNow();
- }
- }
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "2M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
- cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
- cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
- cfg.setProperty(Property.REPLICATION_NAME, "master");
- cfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- @Test(timeout = 60 * 5000)
- public void dataWasReplicatedToThePeer() throws Exception {
- MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
- ROOT_PASSWORD);
- peerCfg.setNumTservers(1);
- peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
- peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
- MiniAccumuloClusterImpl peerCluster = peerCfg.build();
-
- peerCluster.start();
-
- try {
- final Connector connMaster = getConnector();
- final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
-
- ReplicationTable.create(connMaster);
-
- String peerUserName = "peer", peerPassword = "foo";
-
- String peerClusterName = "peer";
-
- connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-
- connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
- connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-
- // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
- connMaster.instanceOperations().setProperty(
- Property.REPLICATION_PEERS.getKey() + peerClusterName,
- ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
- AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
-
- final String masterTable = "master", peerTable = "peer";
-
- connMaster.tableOperations().create(masterTable);
- String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
- Assert.assertNotNull(masterTableId);
-
- connPeer.tableOperations().create(peerTable);
- String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
- Assert.assertNotNull(peerTableId);
-
- connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
-
- // Replicate this table to the peerClusterName in a table with the peerTableId table id
- connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
- connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
-
- // Write some data to table1
- BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
- for (int rows = 0; rows < 5000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 100; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- log.info("Wrote all data to master cluster");
-
- log.debug("");
- for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
- if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- } else {
- log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
- }
- }
-
- final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
-
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
- cluster.exec(TabletServer.class);
-
- log.info("TabletServer restarted");
- for (@SuppressWarnings("unused")
- Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
- log.info("TabletServer is online");
-
- log.info("");
- log.info("Fetching metadata records:");
- for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
- if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- } else {
- log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
- }
- }
-
- log.info("");
- log.info("Fetching replication records:");
- for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- }
-
- Future<Boolean> future = executor.submit(new Callable<Boolean>() {
-
- @Override
- public Boolean call() throws Exception {
- connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
- log.info("Drain completed");
- return true;
- }
-
- });
-
- try {
- future.get(30, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- future.cancel(true);
- Assert.fail("Drain did not finish within 30 seconds");
- }
-
- log.info("drain completed");
-
- log.info("");
- log.info("Fetching metadata records:");
- for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
- if (ReplicationSection.COLF.equals(kv.getKey().getColumnFamily())) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- } else {
- log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
- }
- }
-
- log.info("");
- log.info("Fetching replication records:");
- for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
- log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
- }
-
- Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
- Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
- Entry<Key,Value> masterEntry = null, peerEntry = null;
- while (masterIter.hasNext() && peerIter.hasNext()) {
- masterEntry = masterIter.next();
- peerEntry = peerIter.next();
- Assert.assertEquals(masterEntry.getKey() + " was not equal to " + peerEntry.getKey(), 0,
- masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
- Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
- }
-
- log.info("Last master entry: " + masterEntry);
- log.info("Last peer entry: " + peerEntry);
-
- Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
- Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
- } finally {
- peerCluster.stop();
- }
- }
-
- @Test(timeout = 60 * 5000)
- public void dataReplicatedToCorrectTable() throws Exception {
- MiniAccumuloConfigImpl peerCfg = new MiniAccumuloConfigImpl(createTestDir(this.getClass().getName() + "_" + this.testName.getMethodName() + "_peer"),
- ROOT_PASSWORD);
- peerCfg.setNumTservers(1);
- peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- peerCfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
- peerCfg.setProperty(Property.REPLICATION_NAME, "peer");
- peerCfg.setProperty(Property.REPLICATION_WORK_ASSIGNER, SequentialWorkAssigner.class.getName());
- MiniAccumuloClusterImpl peer1Cluster = peerCfg.build();
-
- peer1Cluster.start();
-
- try {
- Connector connMaster = getConnector();
- Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
-
- String peerClusterName = "peer";
- String peerUserName = "peer", peerPassword = "foo";
-
- // Create local user
- connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-
- connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
- connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-
- // ...peer = AccumuloReplicaSystem,instanceName,zookeepers
- connMaster.instanceOperations().setProperty(
- Property.REPLICATION_PEERS.getKey() + peerClusterName,
- ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
- AccumuloReplicaSystem.buildConfiguration(peer1Cluster.getInstanceName(), peer1Cluster.getZooKeepers())));
-
- String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
-
- // Create tables
- connMaster.tableOperations().create(masterTable1);
- String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
- Assert.assertNotNull(masterTableId1);
-
- connMaster.tableOperations().create(masterTable2);
- String masterTableId2 = connMaster.tableOperations().tableIdMap().get(masterTable2);
- Assert.assertNotNull(masterTableId2);
-
- connPeer.tableOperations().create(peerTable1);
- String peerTableId1 = connPeer.tableOperations().tableIdMap().get(peerTable1);
- Assert.assertNotNull(peerTableId1);
-
- connPeer.tableOperations().create(peerTable2);
- String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
- Assert.assertNotNull(peerTableId2);
-
- // Grant write permission
- connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
- connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
-
- // Replicate this table to the peerClusterName in a table with the peerTableId table id
- connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
- connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
-
- connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION.getKey(), "true");
- connMaster.tableOperations().setProperty(masterTable2, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId2);
-
- // Write some data to table1
- BatchWriter bw = connMaster.createBatchWriter(masterTable1, new BatchWriterConfig());
- long masterTable1Records = 0l;
- for (int rows = 0; rows < 2500; rows++) {
- Mutation m = new Mutation(masterTable1 + rows);
- for (int cols = 0; cols < 100; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- masterTable1Records++;
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Write some data to table2
- bw = connMaster.createBatchWriter(masterTable2, new BatchWriterConfig());
- long masterTable2Records = 0l;
- for (int rows = 0; rows < 2500; rows++) {
- Mutation m = new Mutation(masterTable2 + rows);
- for (int cols = 0; cols < 100; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- masterTable2Records++;
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- log.info("Wrote all data to master cluster");
-
- Set<String> filesFor1 = connMaster.replicationOperations().referencedFiles(masterTable1), filesFor2 = connMaster.replicationOperations().referencedFiles(
- masterTable2);
-
- while (!connMaster.tableOperations().exists(ReplicationTable.NAME)) {
- Thread.sleep(500);
- }
-
- // Restart the tserver to force a close on the WAL
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
- cluster.exec(TabletServer.class);
-
- log.info("Restarted the tserver");
-
- // Read the data -- the tserver is back up and running
- for (@SuppressWarnings("unused")
- Entry<Key,Value> entry : connMaster.createScanner(masterTable1, Authorizations.EMPTY)) {}
-
- // Wait for both tables to be replicated
- log.info("Waiting for {} for {}", filesFor1, masterTable1);
- connMaster.replicationOperations().drain(masterTable1, filesFor1);
-
- log.info("Waiting for {} for {}", filesFor2, masterTable2);
- connMaster.replicationOperations().drain(masterTable2, filesFor2);
-
- long countTable = 0l;
- for (Entry<Key,Value> entry : connPeer.createScanner(peerTable1, Authorizations.EMPTY)) {
- countTable++;
- Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
- .startsWith(masterTable1));
- }
-
- log.info("Found {} records in {}", countTable, peerTable1);
- Assert.assertEquals(masterTable1Records, countTable);
-
- countTable = 0l;
- for (Entry<Key,Value> entry : connPeer.createScanner(peerTable2, Authorizations.EMPTY)) {
- countTable++;
- Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
- .startsWith(masterTable2));
- }
-
- log.info("Found {} records in {}", countTable, peerTable2);
- Assert.assertEquals(masterTable2Records, countTable);
-
- } finally {
- peer1Cluster.stop();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
deleted file mode 100644
index 62c09f5..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSourceOnlyIT.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-
-/**
- * Integration Tests that attempt to evaluate the accuracy of the internal bookkeeping performed on the accumulo "master" instance. Does not send data to any
- * remote instance, merely tracks what is stored locally.
- */
-public class ReplicationSourceOnlyIT extends ConfigurableMacIT {
- @Override
- public int defaultTimeoutSeconds() {
- return 300;
- }
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
- Multimap<String,String> logs = HashMultimap.create();
- Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- scanner.fetchColumnFamily(LogColumnFamily.NAME);
- scanner.setRange(new Range());
- for (Entry<Key,Value> entry : scanner) {
- if (Thread.interrupted()) {
- return logs;
- }
-
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
-
- for (String log : logEntry.logSet) {
- // Need to normalize the log file from LogEntry
- logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
- }
- }
- return logs;
- }
-
- @Test
- public void replicationEntriesPrecludeWalDeletion() throws Exception {
- final Connector conn = getConnector();
- String table1 = "table1", table2 = "table2", table3 = "table3";
- final Multimap<String,String> logs = HashMultimap.create();
- final AtomicBoolean keepRunning = new AtomicBoolean(true);
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- // Should really be able to interrupt here, but the Scanner throws a fit to the logger
- // when that happens
- while (keepRunning.get()) {
- try {
- logs.putAll(getLogs(conn));
- } catch (TableNotFoundException e) {
- log.error("Metadata table doesn't exist");
- }
- }
- }
-
- });
-
- t.start();
-
- conn.tableOperations().create(table1);
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().create(table2);
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().create(table3);
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- Thread.sleep(1000);
-
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Force a write to metadata for the data written
- for (String table : Arrays.asList(table1, table2, table3)) {
- conn.tableOperations().flush(table, null, null, true);
- }
-
- keepRunning.set(false);
- t.join(5000);
-
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Set<String> replFiles = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- replFiles.add(entry.getKey().getRow().toString());
- }
-
- // We might have a WAL that was use solely for the replication table
- // We want to remove that from our list as it should not appear in the replication table
- String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
- Iterator<Entry<String,String>> observedLogs = logs.entries().iterator();
- while (observedLogs.hasNext()) {
- Entry<String,String> observedLog = observedLogs.next();
- if (replicationTableId.equals(observedLog.getValue())) {
- observedLogs.remove();
- }
- }
-
- // We should have *some* reference to each log that was seen in the metadata table
- // They might not yet all be closed though (might be newfile)
- Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
-
- for (String replFile : replFiles) {
- Path p = new Path(replFile);
- FileSystem fs = p.getFileSystem(new Configuration());
- Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
deleted file mode 100644
index da874fa..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTablesMacTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.util.ReplicationTableUtil;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-
-/**
- *
- */
-public class ReplicationTablesMacTest extends ConfigurableMacIT {
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- }
-
- @Test
- public void combinerWorksOnMetadata() throws Exception {
- Connector conn = getConnector();
-
- conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
-
- ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME);
-
- Status stat1 = StatusUtil.fileCreated(100);
- Status stat2 = StatusUtil.fileClosed();
-
- BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
- m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1));
- bw.addMutation(m);
- bw.close();
-
- Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- System.out.println("Printing metadata table");
-
- Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
- Assert.assertEquals(stat1, actual);
-
- bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
- m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
- m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2));
- bw.addMutation(m);
- bw.close();
-
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
-
- actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
- Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build();
-
- Assert.assertEquals(expected, actual);
- }
-
-}
[5/6] ACCUMULO-378 Test class consolidation
Posted by el...@apache.org.
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
index 7146019..f42c5ad 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationTest.java
@@ -19,41 +19,72 @@ package org.apache.accumulo.test.replication;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.gc.SimpleGarbageCollector;
+import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.minicluster.impl.ProcessReference;
import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.util.ReplicationTableUtil;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.accumulo.tserver.TabletServer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.protobuf.TextFormat;
/**
* Tests for replication that should be run at every build -- basic functionality
*/
public class ReplicationTest extends ConfigurableMacIT {
+ private static final Logger log = LoggerFactory.getLogger(ReplicationTest.class);
@Override
public int defaultTimeoutSeconds() {
@@ -62,10 +93,37 @@ public class ReplicationTest extends ConfigurableMacIT {
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
// Run the master replication loop run frequently
cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "0s");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+ cfg.setProperty(Property.GC_CYCLE_START, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
+ cfg.setProperty(Property.REPLICATION_NAME, "master");
+ cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
+ cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
cfg.setNumTservers(1);
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
+ Multimap<String,String> logs = HashMultimap.create();
+ Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ scanner.fetchColumnFamily(LogColumnFamily.NAME);
+ scanner.setRange(new Range());
+ for (Entry<Key,Value> entry : scanner) {
+ if (Thread.interrupted()) {
+ return logs;
+ }
+
+ LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+
+ for (String log : logEntry.logSet) {
+ // Need to normalize the log file from LogEntry
+ logs.put(new Path(log).toString(), logEntry.extent.getTableId().toString());
+ }
+ }
+ return logs;
}
@Test
@@ -300,4 +358,1079 @@ public class ReplicationTest extends ConfigurableMacIT {
Assert.assertFalse("Expected to only find two elements in replication table", iter.hasNext());
}
+ @Test
+ public void replicationEntriesPrecludeWalDeletion() throws Exception {
+ final Connector conn = getConnector();
+ String table1 = "table1", table2 = "table2", table3 = "table3";
+ final Multimap<String,String> logs = HashMultimap.create();
+ final AtomicBoolean keepRunning = new AtomicBoolean(true);
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+ // when that happens
+ while (keepRunning.get()) {
+ try {
+ logs.putAll(getLogs(conn));
+ } catch (TableNotFoundException e) {
+ log.error("Metadata table doesn't exist");
+ }
+ }
+ }
+
+ });
+
+ t.start();
+
+ conn.tableOperations().create(table1);
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+ Thread.sleep(1000);
+
+ // Write some data to table1
+ BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ conn.tableOperations().create(table2);
+ conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+ Thread.sleep(1000);
+
+ // Write some data to table2
+ bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ conn.tableOperations().create(table3);
+ conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+ Thread.sleep(1000);
+
+ // Write some data to table3
+ bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Force a write to metadata for the data written
+ for (String table : Arrays.asList(table1, table2, table3)) {
+ conn.tableOperations().flush(table, null, null, true);
+ }
+
+ keepRunning.set(false);
+ t.join(5000);
+
+ Scanner s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ Set<String> replFiles = new HashSet<>();
+ for (Entry<Key,Value> entry : s) {
+ replFiles.add(entry.getKey().getRow().toString());
+ }
+
+ // We might have a WAL that was use solely for the replication table
+ // We want to remove that from our list as it should not appear in the replication table
+ String replicationTableId = conn.tableOperations().tableIdMap().get(ReplicationTable.NAME);
+ Iterator<Entry<String,String>> observedLogs = logs.entries().iterator();
+ while (observedLogs.hasNext()) {
+ Entry<String,String> observedLog = observedLogs.next();
+ if (replicationTableId.equals(observedLog.getValue())) {
+ observedLogs.remove();
+ }
+ }
+
+ // We should have *some* reference to each log that was seen in the metadata table
+ // They might not yet all be closed though (might be newfile)
+ Assert.assertEquals("Metadata log distribution: " + logs, logs.keySet(), replFiles);
+
+ for (String replFile : replFiles) {
+ Path p = new Path(replFile);
+ FileSystem fs = p.getFileSystem(new Configuration());
+ Assert.assertTrue("File does not exist anymore, it was likely incorrectly garbage collected: " + p, fs.exists(p));
+ }
+ }
+
+ @Test
+ public void combinerWorksOnMetadata() throws Exception {
+ Connector conn = getConnector();
+
+ conn.securityOperations().grantTablePermission("root", MetadataTable.NAME, TablePermission.WRITE);
+
+ ReplicationTableUtil.configureMetadataTable(conn, MetadataTable.NAME);
+
+ Status stat1 = StatusUtil.fileCreated(100);
+ Status stat2 = StatusUtil.fileClosed();
+
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
+ m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat1));
+ bw.addMutation(m);
+ bw.close();
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+
+ Status actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
+ Assert.assertEquals(stat1, actual);
+
+ bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wals/tserver+port/uuid");
+ m.put(ReplicationSection.COLF, new Text("1"), ProtobufUtil.toValue(stat2));
+ bw.addMutation(m);
+ bw.close();
+
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+
+ actual = Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
+ Status expected = Status.newBuilder().setBegin(0).setEnd(0).setClosed(true).setInfiniteEnd(true).setCreatedTime(100).build();
+
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void noDeadlock() throws Exception {
+ final Connector conn = getConnector();
+
+ if (conn.tableOperations().exists(ReplicationTable.NAME)) {
+ conn.tableOperations().delete(ReplicationTable.NAME);
+ }
+
+ ReplicationTable.create(conn);
+ conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+ final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ final Set<String> metadataWals = new HashSet<>();
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+ // when that happens
+ while (keepRunning.get()) {
+ try {
+ metadataWals.addAll(getLogs(conn).keySet());
+ } catch (Exception e) {
+ log.error("Metadata table doesn't exist");
+ }
+ }
+ }
+
+ });
+
+ t.start();
+
+ String table1 = "table1", table2 = "table2", table3 = "table3";
+
+ conn.tableOperations().create(table1);
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+ conn.tableOperations().create(table2);
+ conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+ conn.tableOperations().create(table3);
+ conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+
+ // Write some data to table1
+ BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Write some data to table2
+ bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Write some data to table3
+ bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Flush everything to try to make the replication records
+ for (String table : Arrays.asList(table1, table2, table3)) {
+ conn.tableOperations().flush(table, null, null, true);
+ }
+
+ keepRunning.set(false);
+ t.join(5000);
+
+ for (String table : Arrays.asList(MetadataTable.NAME, table1, table2, table3)) {
+ Scanner s = conn.createScanner(table, new Authorizations());
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : s) {}
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void filesClosedAfterUnused() throws Exception {
+ Connector conn = getConnector();
+
+ String table = "table";
+ conn.tableOperations().create(table);
+ String tableId = conn.tableOperations().tableIdMap().get(table);
+
+ Assert.assertNotNull(tableId);
+
+ conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+ // just sleep
+ conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+ ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "50000"));
+
+ // Write a mutation to make a log file
+ BatchWriter bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ Mutation m = new Mutation("one");
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+
+ // Write another to make sure the logger rolls itself?
+ bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ m = new Mutation("three");
+ m.put("", "", "");
+ bw.addMutation(m);
+ bw.close();
+
+ Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(TabletsSection.LogColumnFamily.NAME);
+ s.setRange(TabletsSection.getRange(tableId));
+ Set<String> wals = new HashSet<>();
+ for (Entry<Key,Value> entry : s) {
+ LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
+ for (String file : logEntry.logSet) {
+ Path p = new Path(file);
+ wals.add(p.toString());
+ }
+ }
+
+ log.warn("Found wals {}", wals);
+
+ // for (int j = 0; j < 5; j++) {
+ bw = conn.createBatchWriter(table, new BatchWriterConfig());
+ m = new Mutation("three");
+ byte[] bytes = new byte[1024 * 1024];
+ m.put("1".getBytes(), new byte[0], bytes);
+ m.put("2".getBytes(), new byte[0], bytes);
+ m.put("3".getBytes(), new byte[0], bytes);
+ m.put("4".getBytes(), new byte[0], bytes);
+ m.put("5".getBytes(), new byte[0], bytes);
+ bw.addMutation(m);
+ bw.close();
+
+ conn.tableOperations().flush(table, null, null, true);
+
+ while (!conn.tableOperations().exists(ReplicationTable.NAME)) {
+ UtilWaitThread.sleep(500);
+ }
+
+ for (int i = 0; i < 5; i++) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.fetchColumnFamily(LogColumnFamily.NAME);
+ s.setRange(TabletsSection.getRange(tableId));
+ for (Entry<Key,Value> entry : s) {
+ log.info(entry.getKey().toStringNoTruncate() + "=" + entry.getValue());
+ }
+
+ s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ Text buff = new Text();
+ boolean allReferencedLogsClosed = true;
+ int recordsFound = 0;
+ for (Entry<Key,Value> e : s) {
+ recordsFound++;
+ allReferencedLogsClosed = true;
+ StatusSection.getFile(e.getKey(), buff);
+ String file = buff.toString();
+ if (wals.contains(file)) {
+ Status stat = Status.parseFrom(e.getValue().get());
+ if (!stat.getClosed()) {
+ log.info("{} wasn't closed", file);
+ allReferencedLogsClosed = false;
+ }
+ }
+ }
+
+ if (recordsFound > 0 && allReferencedLogsClosed) {
+ return;
+ }
+
+ Thread.sleep(1000);
+ }
+
+ Assert.fail("We had a file that was referenced but didn't get closed");
+ }
+
+ @Test
+ public void singleTableWithSingleTarget() throws Exception {
+ // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
+ // against expected Status messages.
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+ cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+ }
+
+ Connector conn = getConnector();
+ String table1 = "table1";
+
+ // replication shouldn't exist when we begin
+ Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+ // Create a table
+ conn.tableOperations().create(table1);
+
+ int attempts = 5;
+ while (attempts > 0) {
+ try {
+ // Enable replication on table1
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+ // Replicate table1 to cluster1 in the table with id of '4'
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+ conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+ ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000"));
+ break;
+ } catch (Exception e) {
+ attempts--;
+ if (attempts <= 0) {
+ throw e;
+ }
+ UtilWaitThread.sleep(500);
+ }
+ }
+
+ // Write some data to table1
+ BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+ for (int rows = 0; rows < 2000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 50; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Make sure the replication table exists at this point
+ boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+ attempts = 5;
+ do {
+ if (!exists) {
+ UtilWaitThread.sleep(200);
+ exists = conn.tableOperations().exists(ReplicationTable.NAME);
+ attempts--;
+ }
+ } while (!exists && attempts > 0);
+ Assert.assertTrue("Replication table was never created", exists);
+
+ // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
+ for (int i = 0; i < 5 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
+ UtilWaitThread.sleep(1000);
+ }
+
+ Assert.assertTrue("Combiner was never set on replication table",
+ conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
+
+ // Trigger the minor compaction, waiting for it to finish.
+ // This should write the entry to metadata that the file has data
+ conn.tableOperations().flush(table1, null, null, true);
+
+ // Make sure that we have one status element, should be a new file
+ Scanner s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ Entry<Key,Value> entry = null;
+ Status expectedStatus = StatusUtil.openWithUnknownLength();
+ attempts = 5;
+ // This record will move from new to new with infinite length because of the minc (flush)
+ while (null == entry && attempts > 0) {
+ try {
+ entry = Iterables.getOnlyElement(s);
+ Status actual = Status.parseFrom(entry.getValue().get());
+ if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) {
+ entry = null;
+ // the master process didn't yet fire and write the new mutation, wait for it to do
+ // so and try to read it again
+ Thread.sleep(1000);
+ }
+ } catch (NoSuchElementException e) {
+ entry = null;
+ Thread.sleep(500);
+ } catch (IllegalArgumentException e) {
+ // saw this contain 2 elements once
+ s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ for (Entry<Key,Value> content : s) {
+ log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ }
+ throw e;
+ } finally {
+ attempts--;
+ }
+ }
+
+ Assert.assertNotNull("Could not find expected entry in replication table", entry);
+ Status actual = Status.parseFrom(entry.getValue().get());
+ Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual), !actual.getClosed() && actual.getInfiniteEnd());
+
+ // Try a couple of times to watch for the work record to be created
+ boolean notFound = true;
+ for (int i = 0; i < 10 && notFound; i++) {
+ s = ReplicationTable.getScanner(conn);
+ WorkSection.limit(s);
+ int elementsFound = Iterables.size(s);
+ if (0 < elementsFound) {
+ Assert.assertEquals(1, elementsFound);
+ notFound = false;
+ }
+ Thread.sleep(500);
+ }
+
+ // If we didn't find the work record, print the contents of the table
+ if (notFound) {
+ s = ReplicationTable.getScanner(conn);
+ for (Entry<Key,Value> content : s) {
+ log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ }
+ Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+ }
+
+ // Write some more data so that we over-run the single WAL
+ bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+ for (int rows = 0; rows < 2000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 50; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ conn.tableOperations().compact(ReplicationTable.NAME, null, null, true, true);
+
+ s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ Assert.assertEquals(2, Iterables.size(s));
+
+ // We should eventually get 2 work records recorded, need to account for a potential delay though
+ // might see: status1 -> work1 -> status2 -> (our scans) -> work2
+ notFound = true;
+ for (int i = 0; i < 10 && notFound; i++) {
+ s = ReplicationTable.getScanner(conn);
+ WorkSection.limit(s);
+ int elementsFound = Iterables.size(s);
+ if (2 == elementsFound) {
+ notFound = false;
+ }
+ Thread.sleep(500);
+ }
+
+ // If we didn't find the work record, print the contents of the table
+ if (notFound) {
+ s = ReplicationTable.getScanner(conn);
+ for (Entry<Key,Value> content : s) {
+ log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ }
+ Assert.assertFalse("Did not find the work entries for the status entries", notFound);
+ }
+ }
+
+ @Test
+ public void correctClusterNameInWorkEntry() throws Exception {
+ Connector conn = getConnector();
+ String table1 = "table1";
+
+ // replication shouldn't exist when we begin
+ Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+ // Create two tables
+ conn.tableOperations().create(table1);
+
+ int attempts = 5;
+ while (attempts > 0) {
+ try {
+ // Enable replication on table1
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+ // Replicate table1 to cluster1 in the table with id of '4'
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+ attempts = 0;
+ } catch (Exception e) {
+ attempts--;
+ if (attempts <= 0) {
+ throw e;
+ }
+ UtilWaitThread.sleep(500);
+ }
+ }
+
+ // Write some data to table1
+ BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+ for (int rows = 0; rows < 2000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 50; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ String tableId = conn.tableOperations().tableIdMap().get(table1);
+ Assert.assertNotNull("Table ID was null", tableId);
+
+ // Make sure the replication table exists at this point
+ boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+ attempts = 5;
+ do {
+ if (!exists) {
+ UtilWaitThread.sleep(500);
+ exists = conn.tableOperations().exists(ReplicationTable.NAME);
+ attempts--;
+ }
+ } while (!exists && attempts > 0);
+ Assert.assertTrue("Replication table did not exist", exists);
+
+ for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) {
+ Thread.sleep(1000);
+ }
+
+ Assert.assertTrue(conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ));
+
+ boolean notFound = true;
+ Scanner s;
+ for (int i = 0; i < 10 && notFound; i++) {
+ s = ReplicationTable.getScanner(conn);
+ WorkSection.limit(s);
+ try {
+ Entry<Key,Value> e = Iterables.getOnlyElement(s);
+ Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
+ Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
+ notFound = false;
+ } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {
+ s = ReplicationTable.getScanner(conn);
+ for (Entry<Key,Value> content : s) {
+ log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ }
+ Assert.fail("Found more than one work section entry");
+ }
+
+ Thread.sleep(500);
+ }
+
+ if (notFound) {
+ s = ReplicationTable.getScanner(conn);
+ for (Entry<Key,Value> content : s) {
+ log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ }
+ Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+ }
+ }
+
+ @Test(timeout = 6 * 60 * 1000)
+ public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
+ Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
+ for (ProcessReference ref : gcProcs) {
+ cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
+ }
+
+ final Connector conn = getConnector();
+
+ if (conn.tableOperations().exists(ReplicationTable.NAME)) {
+ conn.tableOperations().delete(ReplicationTable.NAME);
+ }
+
+ ReplicationTable.create(conn);
+ conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+ final AtomicBoolean keepRunning = new AtomicBoolean(true);
+ final Set<String> metadataWals = new HashSet<>();
+
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ // Should really be able to interrupt here, but the Scanner throws a fit to the logger
+ // when that happens
+ while (keepRunning.get()) {
+ try {
+ metadataWals.addAll(getLogs(conn).keySet());
+ } catch (Exception e) {
+ log.error("Metadata table doesn't exist");
+ }
+ }
+ }
+
+ });
+
+ t.start();
+
+ String table1 = "table1", table2 = "table2", table3 = "table3";
+
+ BatchWriter bw;
+ try {
+ conn.tableOperations().create(table1);
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+ conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+ ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
+
+ // Write some data to table1
+ bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ conn.tableOperations().create(table2);
+ conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+
+ // Write some data to table2
+ bw = conn.createBatchWriter(table2, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ conn.tableOperations().create(table3);
+ conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
+ conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
+
+ // Write some data to table3
+ bw = conn.createBatchWriter(table3, new BatchWriterConfig());
+ for (int rows = 0; rows < 200; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 500; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Flush everything to try to make the replication records
+ for (String table : Arrays.asList(table1, table2, table3)) {
+ conn.tableOperations().compact(table, null, null, true, true);
+ }
+ } finally {
+ keepRunning.set(false);
+ t.join(5000);
+ Assert.assertFalse(t.isAlive());
+ }
+
+ // write a Long.MAX_VALUE into each repl entry
+ Scanner s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ Status finishedReplStatus = StatusUtil.replicated(Long.MAX_VALUE);
+ Set<String> filesToWatch = new HashSet<>();
+ Text buff = new Text();
+ for (Entry<Key,Value> entry : s) {
+ StatusSection.getFile(entry.getKey(), buff);
+ filesToWatch.add(buff.toString());
+ Status status = Status.parseFrom(entry.getValue().get());
+ Assert.assertFalse(status.getClosed());
+
+ // Fake that each one is fully replicated
+ Mutation m = new Mutation(entry.getKey().getRow());
+ m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(), new Value(finishedReplStatus.toByteArray()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
+ for (Entry<Key,Value> entry : s) {
+ Status status = Status.parseFrom(entry.getValue().get());
+ Assert.assertFalse(status.getClosed());
+
+ // Fake that each one is fully replicated
+ Mutation m = new Mutation(entry.getKey().getRow());
+ m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(),
+ StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+ bw.addMutation(m);
+ }
+ bw.close();
+
+ // Kill the tserver(s) and restart them
+ // to ensure that the WALs we previously observed all move to closed.
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+
+ cluster.exec(TabletServer.class);
+
+ // Make sure we can read all the tables (recovery complete)
+ for (String table : Arrays.asList(table1, table2, table3)) {
+ s = conn.createScanner(table, new Authorizations());
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : s) {}
+ }
+
+ // Starting the gc will run CloseWriteAheadLogReferences which will first close Statuses
+ // in the metadata table, and then in the replication table
+ Process gc = cluster.exec(SimpleGarbageCollector.class);
+
+ try {
+ boolean allClosed = true;
+
+ // We should either find all closed records or no records
+ // After they're closed, they are candidates for deletion
+ for (int i = 0; i < 10; i++) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
+ Iterator<Entry<Key,Value>> iter = s.iterator();
+
+ long recordsFound = 0l;
+ while (allClosed && iter.hasNext()) {
+ Entry<Key,Value> entry = iter.next();
+ String wal = entry.getKey().getRow().toString();
+ if (metadataWals.contains(wal)) {
+ Status status = Status.parseFrom(entry.getValue().get());
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
+ allClosed &= status.getClosed();
+ recordsFound++;
+ }
+ }
+
+ log.info("Found {} records from the metadata table", recordsFound);
+ if (allClosed) {
+ break;
+ }
+ }
+
+ if (!allClosed) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
+ for (Entry<Key,Value> entry : s) {
+ log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
+ }
+ Assert.fail("Expected all replication records in the metadata table to be closed");
+ }
+
+ for (int i = 0; i < 10; i++) {
+ allClosed = true;
+
+ s = ReplicationTable.getScanner(conn);
+ Iterator<Entry<Key,Value>> iter = s.iterator();
+
+ long recordsFound = 0l;
+ while (allClosed && iter.hasNext()) {
+ Entry<Key,Value> entry = iter.next();
+ String wal = entry.getKey().getRow().toString();
+ if (metadataWals.contains(wal)) {
+ Status status = Status.parseFrom(entry.getValue().get());
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
+ allClosed &= status.getClosed();
+ recordsFound++;
+ }
+ }
+
+ log.info("Found {} records from the replication table", recordsFound);
+ if (allClosed) {
+ break;
+ }
+
+ UtilWaitThread.sleep(1000);
+ }
+
+ if (!allClosed) {
+ s = ReplicationTable.getScanner(conn);
+ StatusSection.limit(s);
+ for (Entry<Key,Value> entry : s) {
+ log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
+ }
+ Assert.fail("Expected all replication records in the replication table to be closed");
+ }
+
+ } finally {
+ gc.destroy();
+ gc.waitFor();
+ }
+
+ }
+
+ @Test(timeout = 5 * 60 * 1000)
+ public void replicatedStatusEntriesAreDeleted() throws Exception {
+ // Just stop it now, we'll restart it after we restart the tserver
+ for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+ getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+ }
+
+ final Connector conn = getConnector();
+ log.info("Got connector to MAC");
+ String table1 = "table1";
+
+ // replication shouldn't exist when we begin
+ Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
+
+ ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
+ thread.start();
+
+ try {
+ // Create two tables
+ conn.tableOperations().create(table1);
+
+ int attempts = 5;
+ while (attempts > 0) {
+ try {
+ // Enable replication on table1
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+ // Replicate table1 to cluster1 in the table with id of '4'
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+ // Use the MockReplicaSystem impl and sleep for 5seconds
+ conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+ ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
+ attempts = 0;
+ } catch (Exception e) {
+ attempts--;
+ if (attempts <= 0) {
+ throw e;
+ }
+ UtilWaitThread.sleep(500);
+ }
+ }
+
+ String tableId = conn.tableOperations().tableIdMap().get(table1);
+ Assert.assertNotNull("Could not determine table id for " + table1, tableId);
+
+ // Write some data to table1
+ BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+ for (int rows = 0; rows < 2000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 50; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
+ }
+
+ bw.close();
+
+ // Make sure the replication table exists at this point
+ boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+ attempts = 10;
+ do {
+ if (!exists) {
+ UtilWaitThread.sleep(1000);
+ exists = conn.tableOperations().exists(ReplicationTable.NAME);
+ attempts--;
+ }
+ } while (!exists && attempts > 0);
+ Assert.assertTrue("Replication table did not exist", exists);
+
+ // Grant ourselves the write permission for later
+ conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+ // Find the WorkSection record that will be created for that data we ingested
+ boolean notFound = true;
+ Scanner s;
+ for (int i = 0; i < 10 && notFound; i++) {
+ try {
+ s = ReplicationTable.getScanner(conn);
+ WorkSection.limit(s);
+ Entry<Key,Value> e = Iterables.getOnlyElement(s);
+ Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
+ Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
+ notFound = false;
+ } catch (NoSuchElementException e) {
+
+ } catch (IllegalArgumentException e) {
+ // Somehow we got more than one element. Log what they were
+ s = ReplicationTable.getScanner(conn);
+ for (Entry<Key,Value> content : s) {
+ log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ }
+ Assert.fail("Found more than one work section entry");
+ } catch (RuntimeException e) {
+ // Catch a propagation issue, fail if it's not what we expect
+ Throwable cause = e.getCause();
+ if (cause instanceof AccumuloSecurityException) {
+ AccumuloSecurityException sec = (AccumuloSecurityException) cause;
+ switch (sec.getSecurityErrorCode()) {
+ case PERMISSION_DENIED:
+ // retry -- the grant didn't happen yet
+ log.warn("Sleeping because permission was denied");
+ default:
+ throw e;
+ }
+ } else {
+ throw e;
+ }
+ }
+
+ Thread.sleep(1000);
+ }
+
+ if (notFound) {
+ s = ReplicationTable.getScanner(conn);
+ for (Entry<Key,Value> content : s) {
+ log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ }
+ Assert.assertFalse("Did not find the work entry for the status entry", notFound);
+ }
+
+ /**
+ * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
+ * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
+ * anymore
+ */
+
+ log.info("Killing tserver");
+ // Kill the tserver(s) and restart them
+ // to ensure that the WALs we previously observed all move to closed.
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
+ }
+
+ log.info("Starting tserver");
+ cluster.exec(TabletServer.class);
+
+ log.info("Waiting to read tables");
+
+ // Make sure we can read all the tables (recovery complete)
+ for (String table : new String[] {MetadataTable.NAME, table1}) {
+ s = conn.createScanner(table, new Authorizations());
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : s) {}
+ }
+
+ log.info("Checking for replication entries in replication");
+ // Then we need to get those records over to the replication table
+ boolean foundResults = false;
+ for (int i = 0; i < 5; i++) {
+ s = ReplicationTable.getScanner(conn);
+ int count = 0;
+ for (Entry<Key,Value> entry : s) {
+ count++;
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ }
+ if (count > 0) {
+ foundResults = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
+
+ getCluster().exec(SimpleGarbageCollector.class);
+
+ // Wait for a bit since the GC has to run (should be running after a one second delay)
+ Thread.sleep(5000);
+
+ // We expect no records in the metadata table after compaction. We have to poll
+ // because we have to wait for the StatusMaker's next iteration which will clean
+ // up the dangling *closed* records after we create the record in the replication table.
+ // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
+ log.info("Checking metadata table for replication entries");
+ foundResults = true;
+ for (int i = 0; i < 5; i++) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ long size = 0;
+ for (Entry<Key,Value> e : s) {
+ size++;
+ log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
+ }
+ if (size == 0) {
+ foundResults = false;
+ break;
+ }
+ Thread.sleep(1000);
+ log.info("");
+ }
+
+ Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
+
+ /**
+ * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
+ * deleted after replication occurs
+ */
+
+ int recordsFound = 0;
+ for (int i = 0; i < 10; i++) {
+ s = ReplicationTable.getScanner(conn);
+ recordsFound = 0;
+ for (Entry<Key,Value> entry : s) {
+ recordsFound++;
+ log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
+ }
+
+ if (0 == recordsFound) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ log.info("");
+ }
+ }
+
+ Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
+ } finally {
+ thread.interrupt();
+ thread.join(5000);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
deleted file mode 100644
index 7d9c537..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.fate.util.UtilWaitThread;
-import org.apache.accumulo.gc.SimpleGarbageCollector;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.accumulo.tserver.TabletServer;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Iterables;
-import com.google.protobuf.TextFormat;
-
-/**
- *
- */
-public class ReplicationWithGCIT extends ConfigurableMacIT {
- private static final Logger log = LoggerFactory.getLogger(ReplicationWithGCIT.class);
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "0");
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
- cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
- cfg.setProperty(Property.REPLICATION_NAME, "master");
- cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
- cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
- cfg.setNumTservers(1);
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- }
-
- private Set<String> metadataWals(Connector conn) throws Exception {
- Scanner s = conn.createScanner(MetadataTable.NAME, new Authorizations());
- s.fetchColumnFamily(LogColumnFamily.NAME);
- Set<String> metadataWals = new HashSet<>();
- for (Entry<Key,Value> entry : s) {
- LogEntry logEntry = LogEntry.fromKeyValue(entry.getKey(), entry.getValue());
- for (String log : logEntry.logSet) {
- metadataWals.add(new Path(log).toString());
- }
- }
- return metadataWals;
- }
-
- @Test(timeout = 6 * 60 * 1000)
- public void replicationRecordsAreClosedAfterGarbageCollection() throws Exception {
- Collection<ProcessReference> gcProcs = cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR);
- for (ProcessReference ref : gcProcs) {
- cluster.killProcess(ServerType.GARBAGE_COLLECTOR, ref);
- }
-
- final Connector conn = getConnector();
-
- if (conn.tableOperations().exists(ReplicationTable.NAME)) {
- conn.tableOperations().delete(ReplicationTable.NAME);
- }
-
- ReplicationTable.create(conn);
- conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
- final AtomicBoolean keepRunning = new AtomicBoolean(true);
- final Set<String> metadataWals = new HashSet<>();
-
- Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- // Should really be able to interrupt here, but the Scanner throws a fit to the logger
- // when that happens
- while (keepRunning.get()) {
- try {
- metadataWals.addAll(metadataWals(conn));
- } catch (Exception e) {
- log.error("Metadata table doesn't exist");
- }
- }
- }
-
- });
-
- t.start();
-
- String table1 = "table1", table2 = "table2", table3 = "table3";
-
- conn.tableOperations().create(table1);
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
- conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
- ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, null));
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().create(table2);
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table2, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-
- // Write some data to table2
- bw = conn.createBatchWriter(table2, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().create(table3);
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION.getKey(), "true");
- conn.tableOperations().setProperty(table3, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "1");
-
- // Write some data to table3
- bw = conn.createBatchWriter(table3, new BatchWriterConfig());
- for (int rows = 0; rows < 200; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Flush everything to try to make the replication records
- for (String table : Arrays.asList(table1, table2, table3)) {
- conn.tableOperations().flush(table, null, null, true);
- }
-
- keepRunning.set(false);
- t.join(5000);
-
- // write a Long.MAX_VALUE into each repl entry
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
- Status finishedReplStatus = StatusUtil.replicated(Long.MAX_VALUE);
- for (Entry<Key,Value> entry : s) {
- Status status = Status.parseFrom(entry.getValue().get());
- Assert.assertFalse(status.getClosed());
-
- // Fake that each one is fully replicated
- Mutation m = new Mutation(entry.getKey().getRow());
- m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(), new Value(finishedReplStatus.toByteArray()));
- bw.addMutation(m);
- }
- bw.close();
-
- s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
- for (Entry<Key,Value> entry : s) {
- Status status = Status.parseFrom(entry.getValue().get());
- Assert.assertFalse(status.getClosed());
-
- // Fake that each one is fully replicated
- Mutation m = new Mutation(entry.getKey().getRow());
- m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(),
- StatusUtil.fileCreatedValue(System.currentTimeMillis()));
- bw.addMutation(m);
- }
- bw.close();
-
- // Kill the tserver(s) and restart them
- // to ensure that the WALs we previously observed all move to closed.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
-
- cluster.exec(TabletServer.class);
-
- // Starting the gc will run CloseWriteAheadLogReferences which will first close Statuses
- // in the metadata table, and then in the replication table
- Process gc = cluster.exec(SimpleGarbageCollector.class);
-
- // Make sure we can read all the tables (recovery complete)
- for (String table : Arrays.asList(table1, table2, table3)) {
- s = conn.createScanner(table, new Authorizations());
- for (@SuppressWarnings("unused")
- Entry<Key,Value> entry : s) {}
- }
-
- try {
- boolean allClosed = true;
-
- // We should either find all closed records or no records
- // After they're closed, they are candidates for deletion
- for (int i = 0; i < 10; i++) {
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
- Iterator<Entry<Key,Value>> iter = s.iterator();
-
- long recordsFound = 0l;
- while (allClosed && iter.hasNext()) {
- Entry<Key,Value> entry = iter.next();
- String wal = entry.getKey().getRow().toString();
- if (metadataWals.contains(wal)) {
- Status status = Status.parseFrom(entry.getValue().get());
- log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
- allClosed &= status.getClosed();
- recordsFound++;
- }
- }
-
- log.info("Found {} records from the metadata table", recordsFound);
- if (allClosed) {
- break;
- }
- }
-
- if (!allClosed) {
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(Range.prefix(ReplicationSection.getRowPrefix()));
- for (Entry<Key,Value> entry : s) {
- log.info(entry.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(entry.getValue().get())));
- }
- Assert.fail("Expected all replication records in the metadata table to be closed");
- }
-
- for (int i = 0; i < 10; i++) {
- allClosed = true;
-
- s = ReplicationTable.getScanner(conn);
- Iterator<Entry<Key,Value>> iter = s.iterator();
-
- long recordsFound = 0l;
- while (allClosed && iter.hasNext()) {
- Entry<Key,Value> entry = iter.next();
- String wal = entry.getKey().getRow().toString();
- if (metadataWals.contains(wal)) {
- Status status = Status.parseFrom(entry.getValue().get());
- log.info("{}={}", entry.getKey().toStringNoTruncate(), ProtobufUtil.toString(status));
- allClosed &= status.getClosed();
- recordsFound++;
- }
- }
-
- log.info("Found {} records from the replication table", recordsFound);
- if (allClosed) {
- break;
- }
-
- UtilWaitThread.sleep(1000);
- }
-
- if (!allClosed) {
- s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- for (Entry<Key,Value> entry : s) {
- log.info(entry.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(entry.getValue().get())));
- }
- Assert.fail("Expected all replication records in the replication table to be closed");
- }
-
- } finally {
- gc.destroy();
- gc.waitFor();
- }
-
- }
-
- @Test(timeout = 5 * 60 * 1000)
- public void replicatedStatusEntriesAreDeleted() throws Exception {
- // Just stop it now, we'll restart it after we restart the tserver
- for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
- getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
- }
-
- final Connector conn = getConnector();
- log.info("Got connector to MAC");
- String table1 = "table1";
-
- // replication shouldn't exist when we begin
- Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
-
- ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
- thread.start();
-
- try {
- // Create two tables
- conn.tableOperations().create(table1);
-
- int attempts = 5;
- while (attempts > 0) {
- try {
- // Enable replication on table1
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- // Replicate table1 to cluster1 in the table with id of '4'
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
- // Use the MockReplicaSystem impl and sleep for 5seconds
- conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
- ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
- attempts = 0;
- } catch (Exception e) {
- attempts--;
- if (attempts <= 0) {
- throw e;
- }
- UtilWaitThread.sleep(500);
- }
- }
-
- String tableId = conn.tableOperations().tableIdMap().get(table1);
- Assert.assertNotNull("Could not determine table id for " + table1, tableId);
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Make sure the replication table exists at this point
- boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
- attempts = 10;
- do {
- if (!exists) {
- UtilWaitThread.sleep(1000);
- exists = conn.tableOperations().exists(ReplicationTable.NAME);
- attempts--;
- }
- } while (!exists && attempts > 0);
- Assert.assertTrue("Replication table did not exist", exists);
-
- // Grant ourselves the write permission for later
- conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
-
- // Find the WorkSection record that will be created for that data we ingested
- boolean notFound = true;
- Scanner s;
- for (int i = 0; i < 10 && notFound; i++) {
- try {
- s = ReplicationTable.getScanner(conn);
- WorkSection.limit(s);
- Entry<Key,Value> e = Iterables.getOnlyElement(s);
- Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
- Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
- notFound = false;
- } catch (NoSuchElementException e) {
-
- } catch (IllegalArgumentException e) {
- // Somehow we got more than one element. Log what they were
- s = ReplicationTable.getScanner(conn);
- for (Entry<Key,Value> content : s) {
- log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
- }
- Assert.fail("Found more than one work section entry");
- } catch (RuntimeException e) {
- // Catch a propagation issue, fail if it's not what we expect
- Throwable cause = e.getCause();
- if (cause instanceof AccumuloSecurityException) {
- AccumuloSecurityException sec = (AccumuloSecurityException) cause;
- switch (sec.getSecurityErrorCode()) {
- case PERMISSION_DENIED:
- // retry -- the grant didn't happen yet
- log.warn("Sleeping because permission was denied");
- default:
- throw e;
- }
- } else {
- throw e;
- }
- }
-
- Thread.sleep(1000);
- }
-
- if (notFound) {
- s = ReplicationTable.getScanner(conn);
- for (Entry<Key,Value> content : s) {
- log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
- }
- Assert.assertFalse("Did not find the work entry for the status entry", notFound);
- }
-
- /**
- * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
- * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
- * anymore
- */
-
- log.info("Killing tserver");
- // Kill the tserver(s) and restart them
- // to ensure that the WALs we previously observed all move to closed.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
-
- log.info("Starting tserver");
- cluster.exec(TabletServer.class);
-
- log.info("Waiting to read tables");
-
- // Make sure we can read all the tables (recovery complete)
- for (String table : new String[] {MetadataTable.NAME, table1}) {
- s = conn.createScanner(table, new Authorizations());
- for (@SuppressWarnings("unused")
- Entry<Key,Value> entry : s) {}
- }
-
- log.info("Checking for replication entries in replication");
- // Then we need to get those records over to the replication table
- boolean foundResults = false;
- for (int i = 0; i < 5; i++) {
- s = ReplicationTable.getScanner(conn);
- int count = 0;
- for (Entry<Key,Value> entry : s) {
- count++;
- log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
- }
- if (count > 0) {
- foundResults = true;
- break;
- }
- Thread.sleep(1000);
- }
-
- Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
-
- getCluster().exec(SimpleGarbageCollector.class);
-
- // Wait for a bit since the GC has to run (should be running after a one second delay)
- Thread.sleep(5000);
-
- // We expect no records in the metadata table after compaction. We have to poll
- // because we have to wait for the StatusMaker's next iteration which will clean
- // up the dangling *closed* records after we create the record in the replication table.
- // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
- log.info("Checking metadata table for replication entries");
- foundResults = true;
- for (int i = 0; i < 5; i++) {
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- long size = 0;
- for (Entry<Key,Value> e : s) {
- size++;
- log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
- }
- if (size == 0) {
- foundResults = false;
- break;
- }
- Thread.sleep(1000);
- log.info("");
- }
-
- Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
-
- /**
- * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
- * deleted after repliation occurs
- */
-
- int recordsFound = 0;
- for (int i = 0; i < 10; i++) {
- s = ReplicationTable.getScanner(conn);
- recordsFound = 0;
- for (Entry<Key,Value> entry : s) {
- recordsFound++;
- log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
- }
-
- if (0 == recordsFound) {
- break;
- } else {
- Thread.sleep(1000);
- log.info("");
- }
- }
-
- Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
- } finally {
- thread.join(200);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/9d9b5ed2/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
deleted file mode 100644
index 70d6ca1..0000000
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.test.replication;
-
-import java.util.Map.Entry;
-import java.util.NoSuchElementException;
-
-import org.apache.accumulo.core.client.BatchWriter;
-import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
-import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
-import org.apache.accumulo.core.replication.ReplicationTarget;
-import org.apache.accumulo.core.replication.StatusUtil;
-import org.apache.accumulo.core.replication.proto.Replication.Status;
-import org.apache.accumulo.core.security.TablePermission;
-import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.minicluster.ServerType;
-import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.minicluster.impl.ProcessReference;
-import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.test.functional.ConfigurableMacIT;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.io.Text;
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Iterables;
-
-/**
- * Tests for replication that should be run at every build -- basic functionality
- */
-public class ReplicationWithMakerTest extends ConfigurableMacIT {
-
- @Override
- public int defaultTimeoutSeconds() {
- return 30;
- }
-
- @Override
- public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
- hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
- // Run the process in the master which writes replication records from metadata to replication
- // repeatedly without pause
- cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "0");
- cfg.setNumTservers(1);
- }
-
- @Test
- public void singleTableSingleTarget() throws Exception {
- // We want to kill the GC so it doesn't come along and close Status records and mess up the comparisons
- // against expected Status messages.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
- cluster.killProcess(ServerType.GARBAGE_COLLECTOR, proc);
- }
-
- Connector conn = getConnector();
- String table1 = "table1";
-
- // replication shouldn't exist when we begin
- Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
-
- // Create a table
- conn.tableOperations().create(table1);
-
- int attempts = 5;
- while (attempts > 0) {
- try {
- // Enable replication on table1
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- // Replicate table1 to cluster1 in the table with id of '4'
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
- conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
- ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000"));
- break;
- } catch (Exception e) {
- attempts--;
- if (attempts <= 0) {
- throw e;
- }
- UtilWaitThread.sleep(500);
- }
- }
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- // Make sure the replication table exists at this point
- boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
- attempts = 5;
- do {
- if (!exists) {
- UtilWaitThread.sleep(200);
- exists = conn.tableOperations().exists(ReplicationTable.NAME);
- attempts--;
- }
- } while (!exists && attempts > 0);
- Assert.assertTrue("Replication table was never created", exists);
-
- // ACCUMULO-2743 The Observer in the tserver has to be made aware of the change to get the combiner (made by the master)
- for (int i = 0; i < 5 && !conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME); i++) {
- UtilWaitThread.sleep(1000);
- }
-
- Assert.assertTrue("Combiner was never set on replication table",
- conn.tableOperations().listIterators(ReplicationTable.NAME).keySet().contains(ReplicationTable.COMBINER_NAME));
-
- // Trigger the minor compaction, waiting for it to finish.
- // This should write the entry to metadata that the file has data
- conn.tableOperations().flush(table1, null, null, true);
-
- // Make sure that we have one status element, should be a new file
- Scanner s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Entry<Key,Value> entry = null;
- Status expectedStatus = StatusUtil.openWithUnknownLength();
- attempts = 5;
- // This record will move from new to new with infinite length because of the minc (flush)
- while (null == entry && attempts > 0) {
- try {
- entry = Iterables.getOnlyElement(s);
- Status actual = Status.parseFrom(entry.getValue().get());
- if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) {
- entry = null;
- // the master process didn't yet fire and write the new mutation, wait for it to do
- // so and try to read it again
- Thread.sleep(1000);
- }
- } catch (NoSuchElementException e) {
- entry = null;
- Thread.sleep(500);
- } catch (IllegalArgumentException e) {
- // saw this contain 2 elements once
- s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- for (Entry<Key,Value> content : s) {
- log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
- }
- throw e;
- } finally {
- attempts--;
- }
- }
-
- Assert.assertNotNull("Could not find expected entry in replication table", entry);
- Status actual = Status.parseFrom(entry.getValue().get());
- Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual), !actual.getClosed() && actual.getInfiniteEnd());
-
- // Try a couple of times to watch for the work record to be created
- boolean notFound = true;
- for (int i = 0; i < 10 && notFound; i++) {
- s = ReplicationTable.getScanner(conn);
- WorkSection.limit(s);
- int elementsFound = Iterables.size(s);
- if (0 < elementsFound) {
- Assert.assertEquals(1, elementsFound);
- notFound = false;
- }
- Thread.sleep(500);
- }
-
- // If we didn't find the work record, print the contents of the table
- if (notFound) {
- s = ReplicationTable.getScanner(conn);
- for (Entry<Key,Value> content : s) {
- log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
- }
- Assert.assertFalse("Did not find the work entry for the status entry", notFound);
- }
-
- // Write some more data so that we over-run the single WAL
- bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- conn.tableOperations().compact(ReplicationTable.NAME, null, null, true, true);
-
- s = ReplicationTable.getScanner(conn);
- StatusSection.limit(s);
- Assert.assertEquals(2, Iterables.size(s));
-
- // We should eventually get 2 work records recorded, need to account for a potential delay though
- // might see: status1 -> work1 -> status2 -> (our scans) -> work2
- notFound = true;
- for (int i = 0; i < 10 && notFound; i++) {
- s = ReplicationTable.getScanner(conn);
- WorkSection.limit(s);
- int elementsFound = Iterables.size(s);
- if (2 == elementsFound) {
- notFound = false;
- }
- Thread.sleep(500);
- }
-
- // If we didn't find the work record, print the contents of the table
- if (notFound) {
- s = ReplicationTable.getScanner(conn);
- for (Entry<Key,Value> content : s) {
- log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
- }
- Assert.assertFalse("Did not find the work entries for the status entries", notFound);
- }
- }
-
- @Test
- public void correctClusterNameInWorkEntry() throws Exception {
- Connector conn = getConnector();
- String table1 = "table1";
-
- // replication shouldn't exist when we begin
- Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
-
- // Create two tables
- conn.tableOperations().create(table1);
-
- int attempts = 5;
- while (attempts > 0) {
- try {
- // Enable replication on table1
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- // Replicate table1 to cluster1 in the table with id of '4'
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
- attempts = 0;
- } catch (Exception e) {
- attempts--;
- if (attempts <= 0) {
- throw e;
- }
- UtilWaitThread.sleep(500);
- }
- }
-
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
- }
- bw.addMutation(m);
- }
-
- bw.close();
-
- String tableId = conn.tableOperations().tableIdMap().get(table1);
- Assert.assertNotNull("Table ID was null", tableId);
-
- // Make sure the replication table exists at this point
- boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
- attempts = 5;
- do {
- if (!exists) {
- UtilWaitThread.sleep(500);
- exists = conn.tableOperations().exists(ReplicationTable.NAME);
- attempts--;
- }
- } while (!exists && attempts > 0);
- Assert.assertTrue("Replication table did not exist", exists);
-
- for (int i = 0; i < 5 && !conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ); i++) {
- Thread.sleep(1000);
- }
-
- Assert.assertTrue(conn.securityOperations().hasTablePermission("root", ReplicationTable.NAME, TablePermission.READ));
-
- boolean notFound = true;
- Scanner s;
- for (int i = 0; i < 10 && notFound; i++) {
- s = ReplicationTable.getScanner(conn);
- WorkSection.limit(s);
- try {
- Entry<Key,Value> e = Iterables.getOnlyElement(s);
- Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
- Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
- notFound = false;
- } catch (NoSuchElementException e) {} catch (IllegalArgumentException e) {
- s = ReplicationTable.getScanner(conn);
- for (Entry<Key,Value> content : s) {
- log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
- }
- Assert.fail("Found more than one work section entry");
- }
-
- Thread.sleep(500);
- }
-
- if (notFound) {
- s = ReplicationTable.getScanner(conn);
- for (Entry<Key,Value> content : s) {
- log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
- }
- Assert.assertFalse("Did not find the work entry for the status entry", notFound);
- }
- }
-}
[3/6] git commit: ACCUMULO-2587 Fix up the tests to set the
user/passwd where required.
Posted by el...@apache.org.
ACCUMULO-2587 Fix up the tests to set the user/passwd where required.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3605275d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3605275d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3605275d
Branch: refs/heads/ACCUMULO-378
Commit: 3605275d0bf747d78f6cf5a56725f1d8af34785c
Parents: b3ef383
Author: Josh Elser <el...@apache.org>
Authored: Sun May 25 22:08:03 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sun May 25 22:08:03 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 4 +--
.../MasterReplicationCoordinatorTest.java | 25 ++++++++++++++++-
.../test/replication/CyclicReplicationIT.java | 19 +++++++++++++
.../test/replication/ReplicationIT.java | 29 ++++++++++++++++++++
.../ReplicationPortAdvertisementIT.java | 1 -
.../replication/ReplicationSequentialIT.java | 23 ++++++++++++++++
6 files changed, 97 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3605275d/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fe0ea25..8b24332 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -467,7 +467,7 @@ public enum Property {
REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX, "The username to provide when authenticating with the given peer"),
@Experimental
@Sensitive
- REPLICATION_PEER_PASSWORD("replication.peer.password", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"),
+ REPLICATION_PEER_PASSWORD("replication.peer.password.", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"),
@Experimental
REPLICATION_NAME("replication.name", "", PropertyType.STRING, "Name of this cluster with respect to replication. Used to identify this instance from other peers"),
@Experimental
@@ -487,7 +487,7 @@ public enum Property {
@Experimental
REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum size of data to send in a replication message"),
@Experimental
- REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner", PropertyType.CLASSNAME,
+ REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.SequentialWorkAssigner", PropertyType.CLASSNAME,
"Replication WorkAssigner implementation to use"),
@Experimental
REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work"),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3605275d/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java
index 045f542..1ec3f24 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/MasterReplicationCoordinatorTest.java
@@ -19,7 +19,7 @@ package org.apache.accumulo.master.replication;
import java.util.Collections;
import java.util.TreeSet;
-import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.accumulo.master.Master;
import org.apache.accumulo.server.master.state.TServerInstance;
@@ -38,6 +38,13 @@ public class MasterReplicationCoordinatorTest {
public void randomServer() {
Master master = EasyMock.createMock(Master.class);
ZooReader reader = EasyMock.createMock(ZooReader.class);
+ Instance inst = EasyMock.createMock(Instance.class);
+
+ EasyMock.expect(master.getInstance()).andReturn(inst);
+ EasyMock.expect(inst.getInstanceID()).andReturn("1234");
+
+ EasyMock.replay(master, reader, inst);
+
MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, reader);
TServerInstance inst1 = new TServerInstance(HostAndPort.fromParts("host1", 1234), "session");
@@ -48,6 +55,13 @@ public class MasterReplicationCoordinatorTest {
public void invalidOffset() {
Master master = EasyMock.createMock(Master.class);
ZooReader reader = EasyMock.createMock(ZooReader.class);
+ Instance inst = EasyMock.createMock(Instance.class);
+
+ EasyMock.expect(master.getInstance()).andReturn(inst);
+ EasyMock.expect(inst.getInstanceID()).andReturn("1234");
+
+ EasyMock.replay(master, reader, inst);
+
MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, reader);
TServerInstance inst1 = new TServerInstance(HostAndPort.fromParts("host1", 1234), "session");
@@ -58,8 +72,17 @@ public class MasterReplicationCoordinatorTest {
public void randomServerFromMany() {
Master master = EasyMock.createMock(Master.class);
ZooReader reader = EasyMock.createMock(ZooReader.class);
+ Instance inst = EasyMock.createMock(Instance.class);
+
+ EasyMock.expect(master.getInstance()).andReturn(inst);
+ EasyMock.expect(inst.getInstanceID()).andReturn("1234");
+
+ EasyMock.replay(master, reader, inst);
+
MasterReplicationCoordinator coordinator = new MasterReplicationCoordinator(master, reader);
+ EasyMock.verify(master, reader, inst);
+
TreeSet<TServerInstance> instances = new TreeSet<>();
TServerInstance inst1 = new TServerInstance(HostAndPort.fromParts("host1", 1234), "session");
instances.add(inst1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3605275d/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
index d8ef56f..a03cfab 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -36,6 +37,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.LongCombiner.Type;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
@@ -117,6 +119,19 @@ public class CyclicReplicationIT {
try {
Connector connMaster1 = master1Cluster.getConnector("root", password), connMaster2 = master2Cluster.getConnector("root", password);
+ String master1UserName = "master1", master1Password = "foo";
+ String master2UserName = "master2", master2Password = "bar";
+
+ connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password));
+ connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password));
+
+ // Configure the credentials we should use to authenticate ourselves to the peer for replication
+ connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master2Cluster.getInstanceName(), master2UserName);
+ connMaster1.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master2Cluster.getInstanceName(), master2Password);
+
+ connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + master1Cluster.getInstanceName(), master1UserName);
+ connMaster2.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + master1Cluster.getInstanceName(), master1Password);
+
connMaster1.instanceOperations().setProperty(
Property.REPLICATION_PEERS.getKey() + master2Cluster.getInstanceName(),
ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
@@ -145,6 +160,10 @@ public class CyclicReplicationIT {
connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(),
Property.TABLE_REPLICATION_TARGETS.getKey() + master1Cluster.getInstanceName(), master1TableId);
+ // Give our replication user the ability to write to the respective table
+ connMaster1.securityOperations().grantTablePermission(master1UserName, master1Cluster.getInstanceName(), TablePermission.WRITE);
+ connMaster2.securityOperations().grantTablePermission(master2UserName, master2Cluster.getInstanceName(), TablePermission.WRITE);
+
IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class);
SummingCombiner.setEncodingType(summingCombiner, Type.STRING);
SummingCombiner.setCombineAllColumns(summingCombiner, true);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3605275d/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 77dcceb..f34b626 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -35,6 +36,7 @@ import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.StatusUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
@@ -84,6 +86,12 @@ public class ReplicationIT extends ConfigurableMacIT {
Connector connMaster = getConnector();
Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
+ String peerUserName = "repl";
+ String peerPassword = "passwd";
+
+ // Create a user on the peer for replication to use
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
String peerClusterName = "peer";
// ...peer = AccumuloReplicaSystem,instanceName,zookeepers
@@ -92,6 +100,10 @@ public class ReplicationIT extends ConfigurableMacIT {
ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
+ // Configure the credentials we should use to authenticate ourselves to the peer for replication
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
String masterTable = "master", peerTable = "peer";
connMaster.tableOperations().create(masterTable);
@@ -102,6 +114,9 @@ public class ReplicationIT extends ConfigurableMacIT {
String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
Assert.assertNotNull(peerTableId);
+ // Give our replication user the ability to write to the table
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
+
// Replicate this table to the peerClusterName in a table with the peerTableId table id
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
@@ -173,6 +188,16 @@ public class ReplicationIT extends ConfigurableMacIT {
String peerClusterName = "peer";
+ String peerUserName = "repl";
+ String peerPassword = "passwd";
+
+ // Create a user on the peer for replication to use
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ // Configure the credentials we should use to authenticate ourselves to the peer for replication
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
+
// ...peer = AccumuloReplicaSystem,instanceName,zookeepers
connMaster.instanceOperations().setProperty(
Property.REPLICATION_PEERS.getKey() + peerClusterName,
@@ -197,6 +222,10 @@ public class ReplicationIT extends ConfigurableMacIT {
String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
Assert.assertNotNull(peerTableId2);
+ // Give our replication user the ability to write to the tables
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+
// Replicate this table to the peerClusterName in a table with the peerTableId table id
connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3605275d/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
index f879895..0afbc05 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationPortAdvertisementIT.java
@@ -24,7 +24,6 @@ import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooReader;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/3605275d/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
index ac2f25c..c7c36e8 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationSequentialIT.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
@@ -41,6 +42,7 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSectio
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.master.replication.SequentialWorkAssigner;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
@@ -110,8 +112,15 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
ReplicationTable.create(connMaster);
+
+ String peerUserName = "peer", peerPassword = "foo";
String peerClusterName = "peer";
+
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
// ...peer = AccumuloReplicaSystem,instanceName,zookeepers
connMaster.instanceOperations().setProperty(
@@ -128,6 +137,8 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
connPeer.tableOperations().create(peerTable);
String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
Assert.assertNotNull(peerTableId);
+
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
// Replicate this table to the peerClusterName in a table with the peerTableId table id
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
@@ -262,6 +273,13 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
Connector connPeer = peer1Cluster.getConnector("root", ROOT_PASSWORD);
String peerClusterName = "peer";
+ String peerUserName = "peer", peerPassword = "foo";
+
+ // Create local user
+ connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
+
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
+ connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
// ...peer = AccumuloReplicaSystem,instanceName,zookeepers
connMaster.instanceOperations().setProperty(
@@ -271,6 +289,7 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
String masterTable1 = "master1", peerTable1 = "peer1", masterTable2 = "master2", peerTable2 = "peer2";
+ // Create tables
connMaster.tableOperations().create(masterTable1);
String masterTableId1 = connMaster.tableOperations().tableIdMap().get(masterTable1);
Assert.assertNotNull(masterTableId1);
@@ -287,6 +306,10 @@ public class ReplicationSequentialIT extends ConfigurableMacIT {
String peerTableId2 = connPeer.tableOperations().tableIdMap().get(peerTable2);
Assert.assertNotNull(peerTableId2);
+ // Grant write permission
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable1, TablePermission.WRITE);
+ connPeer.securityOperations().grantTablePermission(peerUserName, peerTable2, TablePermission.WRITE);
+
// Replicate this table to the peerClusterName in a table with the peerTableId table id
connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION.getKey(), "true");
connMaster.tableOperations().setProperty(masterTable1, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId1);
[2/6] git commit: ACCUMULO-2587 First addition of authentication
between replication service and client
Posted by el...@apache.org.
ACCUMULO-2587 First addition of authentication between replication service and client
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b3ef383d
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b3ef383d
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b3ef383d
Branch: refs/heads/ACCUMULO-378
Commit: b3ef383d12741229ef7dc11014821225f7fcfcf5
Parents: 2425fd2
Author: Josh Elser <el...@apache.org>
Authored: Fri May 23 14:31:03 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri May 23 14:31:03 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 5 +
.../replication/RemoteReplicationErrorCode.java | 28 -
.../ReplicationCoordinatorErrorCode.java | 26 -
.../thrift/RemoteCoordinationException.java | 500 ------------------
.../thrift/RemoteReplicationErrorCode.java | 70 +++
.../thrift/RemoteReplicationException.java | 78 +--
.../thrift/ReplicationCoordinator.java | 157 +++++-
.../thrift/ReplicationCoordinatorErrorCode.java | 64 +++
.../thrift/ReplicationCoordinatorException.java | 518 +++++++++++++++++++
.../replication/thrift/ReplicationServicer.java | 270 +++++++++-
core/src/main/thrift/replication.thrift | 27 +-
.../MasterReplicationCoordinator.java | 24 +-
.../replication/AccumuloReplicaSystem.java | 34 +-
.../BatchWriterReplicationReplayer.java | 10 +-
.../replication/ReplicationServicerHandler.java | 18 +-
15 files changed, 1164 insertions(+), 665 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 08665df..fe0ea25 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -464,6 +464,11 @@ public enum Property {
@Experimental
REPLICATION_PEERS("replication.peer.", null, PropertyType.PREFIX, "Properties in this category control what systems data can be replicated to"),
@Experimental
+ REPLICATION_PEER_USER("replication.peer.user.", null, PropertyType.PREFIX, "The username to provide when authenticating with the given peer"),
+ @Experimental
+ @Sensitive
+ REPLICATION_PEER_PASSWORD("replication.peer.password", null, PropertyType.PREFIX, "The password to provide when authenticating with the given peer"),
+ @Experimental
REPLICATION_NAME("replication.name", "", PropertyType.STRING, "Name of this cluster with respect to replication. Used to identify this instance from other peers"),
@Experimental
REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "20000000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java b/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
deleted file mode 100644
index 9f9eef7..0000000
--- a/core/src/main/java/org/apache/accumulo/core/replication/RemoteReplicationErrorCode.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-/**
- *
- */
-public enum RemoteReplicationErrorCode {
- COULD_NOT_DESERIALIZE,
- COULD_NOT_APPLY,
- TABLE_DOES_NOT_EXIST,
- CANNOT_AUTHENTICATE,
- CANNOT_INSTANTIATE_REPLAYER,
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/ReplicationCoordinatorErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationCoordinatorErrorCode.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationCoordinatorErrorCode.java
deleted file mode 100644
index d571fe0..0000000
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationCoordinatorErrorCode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.replication;
-
-/**
- *
- */
-public enum ReplicationCoordinatorErrorCode {
-
- NO_AVAILABLE_SERVERS,
- SERVICE_CONFIGURATION_UNAVAILABLE,
-}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteCoordinationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteCoordinationException.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteCoordinationException.java
deleted file mode 100644
index a63cd3d..0000000
--- a/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteCoordinationException.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Autogenerated by Thrift Compiler (0.9.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- * @generated
- */
-package org.apache.accumulo.core.replication.thrift;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("all") public class RemoteCoordinationException extends TException implements org.apache.thrift.TBase<RemoteCoordinationException, RemoteCoordinationException._Fields>, java.io.Serializable, Cloneable {
- private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteCoordinationException");
-
- private static final org.apache.thrift.protocol.TField CODE_FIELD_DESC = new org.apache.thrift.protocol.TField("code", org.apache.thrift.protocol.TType.I32, (short)1);
- private static final org.apache.thrift.protocol.TField REASON_FIELD_DESC = new org.apache.thrift.protocol.TField("reason", org.apache.thrift.protocol.TType.STRING, (short)2);
-
- private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
- static {
- schemes.put(StandardScheme.class, new RemoteCoordinationExceptionStandardSchemeFactory());
- schemes.put(TupleScheme.class, new RemoteCoordinationExceptionTupleSchemeFactory());
- }
-
- public int code; // required
- public String reason; // required
-
- /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
- @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- CODE((short)1, "code"),
- REASON((short)2, "reason");
-
- private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
-
- static {
- for (_Fields field : EnumSet.allOf(_Fields.class)) {
- byName.put(field.getFieldName(), field);
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, or null if its not found.
- */
- public static _Fields findByThriftId(int fieldId) {
- switch(fieldId) {
- case 1: // CODE
- return CODE;
- case 2: // REASON
- return REASON;
- default:
- return null;
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, throwing an exception
- * if it is not found.
- */
- public static _Fields findByThriftIdOrThrow(int fieldId) {
- _Fields fields = findByThriftId(fieldId);
- if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
- return fields;
- }
-
- /**
- * Find the _Fields constant that matches name, or null if its not found.
- */
- public static _Fields findByName(String name) {
- return byName.get(name);
- }
-
- private final short _thriftId;
- private final String _fieldName;
-
- _Fields(short thriftId, String fieldName) {
- _thriftId = thriftId;
- _fieldName = fieldName;
- }
-
- public short getThriftFieldId() {
- return _thriftId;
- }
-
- public String getFieldName() {
- return _fieldName;
- }
- }
-
- // isset id assignments
- private static final int __CODE_ISSET_ID = 0;
- private byte __isset_bitfield = 0;
- public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
- static {
- Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.CODE, new org.apache.thrift.meta_data.FieldMetaData("code", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
- tmpMap.put(_Fields.REASON, new org.apache.thrift.meta_data.FieldMetaData("reason", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- metaDataMap = Collections.unmodifiableMap(tmpMap);
- org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteCoordinationException.class, metaDataMap);
- }
-
- public RemoteCoordinationException() {
- }
-
- public RemoteCoordinationException(
- int code,
- String reason)
- {
- this();
- this.code = code;
- setCodeIsSet(true);
- this.reason = reason;
- }
-
- /**
- * Performs a deep copy on <i>other</i>.
- */
- public RemoteCoordinationException(RemoteCoordinationException other) {
- __isset_bitfield = other.__isset_bitfield;
- this.code = other.code;
- if (other.isSetReason()) {
- this.reason = other.reason;
- }
- }
-
- public RemoteCoordinationException deepCopy() {
- return new RemoteCoordinationException(this);
- }
-
- @Override
- public void clear() {
- setCodeIsSet(false);
- this.code = 0;
- this.reason = null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public RemoteCoordinationException setCode(int code) {
- this.code = code;
- setCodeIsSet(true);
- return this;
- }
-
- public void unsetCode() {
- __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __CODE_ISSET_ID);
- }
-
- /** Returns true if field code is set (has been assigned a value) and false otherwise */
- public boolean isSetCode() {
- return EncodingUtils.testBit(__isset_bitfield, __CODE_ISSET_ID);
- }
-
- public void setCodeIsSet(boolean value) {
- __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __CODE_ISSET_ID, value);
- }
-
- public String getReason() {
- return this.reason;
- }
-
- public RemoteCoordinationException setReason(String reason) {
- this.reason = reason;
- return this;
- }
-
- public void unsetReason() {
- this.reason = null;
- }
-
- /** Returns true if field reason is set (has been assigned a value) and false otherwise */
- public boolean isSetReason() {
- return this.reason != null;
- }
-
- public void setReasonIsSet(boolean value) {
- if (!value) {
- this.reason = null;
- }
- }
-
- public void setFieldValue(_Fields field, Object value) {
- switch (field) {
- case CODE:
- if (value == null) {
- unsetCode();
- } else {
- setCode((Integer)value);
- }
- break;
-
- case REASON:
- if (value == null) {
- unsetReason();
- } else {
- setReason((String)value);
- }
- break;
-
- }
- }
-
- public Object getFieldValue(_Fields field) {
- switch (field) {
- case CODE:
- return Integer.valueOf(getCode());
-
- case REASON:
- return getReason();
-
- }
- throw new IllegalStateException();
- }
-
- /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
- public boolean isSet(_Fields field) {
- if (field == null) {
- throw new IllegalArgumentException();
- }
-
- switch (field) {
- case CODE:
- return isSetCode();
- case REASON:
- return isSetReason();
- }
- throw new IllegalStateException();
- }
-
- @Override
- public boolean equals(Object that) {
- if (that == null)
- return false;
- if (that instanceof RemoteCoordinationException)
- return this.equals((RemoteCoordinationException)that);
- return false;
- }
-
- public boolean equals(RemoteCoordinationException that) {
- if (that == null)
- return false;
-
- boolean this_present_code = true;
- boolean that_present_code = true;
- if (this_present_code || that_present_code) {
- if (!(this_present_code && that_present_code))
- return false;
- if (this.code != that.code)
- return false;
- }
-
- boolean this_present_reason = true && this.isSetReason();
- boolean that_present_reason = true && that.isSetReason();
- if (this_present_reason || that_present_reason) {
- if (!(this_present_reason && that_present_reason))
- return false;
- if (!this.reason.equals(that.reason))
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return 0;
- }
-
- public int compareTo(RemoteCoordinationException other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison = 0;
- RemoteCoordinationException typedOther = (RemoteCoordinationException)other;
-
- lastComparison = Boolean.valueOf(isSetCode()).compareTo(typedOther.isSetCode());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetCode()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.code, typedOther.code);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = Boolean.valueOf(isSetReason()).compareTo(typedOther.isSetReason());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (isSetReason()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.reason, typedOther.reason);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- return 0;
- }
-
- public _Fields fieldForId(int fieldId) {
- return _Fields.findByThriftId(fieldId);
- }
-
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
- schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
- schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("RemoteCoordinationException(");
- boolean first = true;
-
- sb.append("code:");
- sb.append(this.code);
- first = false;
- if (!first) sb.append(", ");
- sb.append("reason:");
- if (this.reason == null) {
- sb.append("null");
- } else {
- sb.append(this.reason);
- }
- first = false;
- sb.append(")");
- return sb.toString();
- }
-
- public void validate() throws org.apache.thrift.TException {
- // check for required fields
- // check for sub-struct validity
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
- try {
- write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
- try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bitfield = 0;
- read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private static class RemoteCoordinationExceptionStandardSchemeFactory implements SchemeFactory {
- public RemoteCoordinationExceptionStandardScheme getScheme() {
- return new RemoteCoordinationExceptionStandardScheme();
- }
- }
-
- private static class RemoteCoordinationExceptionStandardScheme extends StandardScheme<RemoteCoordinationException> {
-
- public void read(org.apache.thrift.protocol.TProtocol iprot, RemoteCoordinationException struct) throws org.apache.thrift.TException {
- org.apache.thrift.protocol.TField schemeField;
- iprot.readStructBegin();
- while (true)
- {
- schemeField = iprot.readFieldBegin();
- if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
- break;
- }
- switch (schemeField.id) {
- case 1: // CODE
- if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.code = iprot.readI32();
- struct.setCodeIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- case 2: // REASON
- if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
- struct.reason = iprot.readString();
- struct.setReasonIsSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- break;
- default:
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
-
- // check for required fields of primitive type, which can't be checked in the validate method
- struct.validate();
- }
-
- public void write(org.apache.thrift.protocol.TProtocol oprot, RemoteCoordinationException struct) throws org.apache.thrift.TException {
- struct.validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- oprot.writeFieldBegin(CODE_FIELD_DESC);
- oprot.writeI32(struct.code);
- oprot.writeFieldEnd();
- if (struct.reason != null) {
- oprot.writeFieldBegin(REASON_FIELD_DESC);
- oprot.writeString(struct.reason);
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
-
- }
-
- private static class RemoteCoordinationExceptionTupleSchemeFactory implements SchemeFactory {
- public RemoteCoordinationExceptionTupleScheme getScheme() {
- return new RemoteCoordinationExceptionTupleScheme();
- }
- }
-
- private static class RemoteCoordinationExceptionTupleScheme extends TupleScheme<RemoteCoordinationException> {
-
- @Override
- public void write(org.apache.thrift.protocol.TProtocol prot, RemoteCoordinationException struct) throws org.apache.thrift.TException {
- TTupleProtocol oprot = (TTupleProtocol) prot;
- BitSet optionals = new BitSet();
- if (struct.isSetCode()) {
- optionals.set(0);
- }
- if (struct.isSetReason()) {
- optionals.set(1);
- }
- oprot.writeBitSet(optionals, 2);
- if (struct.isSetCode()) {
- oprot.writeI32(struct.code);
- }
- if (struct.isSetReason()) {
- oprot.writeString(struct.reason);
- }
- }
-
- @Override
- public void read(org.apache.thrift.protocol.TProtocol prot, RemoteCoordinationException struct) throws org.apache.thrift.TException {
- TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(2);
- if (incoming.get(0)) {
- struct.code = iprot.readI32();
- struct.setCodeIsSet(true);
- }
- if (incoming.get(1)) {
- struct.reason = iprot.readString();
- struct.setReasonIsSet(true);
- }
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationErrorCode.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationErrorCode.java
new file mode 100644
index 0000000..5da3327
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationErrorCode.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.accumulo.core.replication.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+@SuppressWarnings("all") public enum RemoteReplicationErrorCode implements org.apache.thrift.TEnum {
+ COULD_NOT_DESERIALIZE(0),
+ COULD_NOT_APPLY(1),
+ TABLE_DOES_NOT_EXIST(2),
+ CANNOT_AUTHENTICATE(3),
+ CANNOT_INSTANTIATE_REPLAYER(4);
+
+ private final int value;
+
+ private RemoteReplicationErrorCode(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static RemoteReplicationErrorCode findByValue(int value) {
+ switch (value) {
+ case 0:
+ return COULD_NOT_DESERIALIZE;
+ case 1:
+ return COULD_NOT_APPLY;
+ case 2:
+ return TABLE_DOES_NOT_EXIST;
+ case 3:
+ return CANNOT_AUTHENTICATE;
+ case 4:
+ return CANNOT_INSTANTIATE_REPLAYER;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationException.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationException.java
index f93bbba..e825d2c 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationException.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/thrift/RemoteReplicationException.java
@@ -58,11 +58,19 @@ import org.slf4j.LoggerFactory;
schemes.put(TupleScheme.class, new RemoteReplicationExceptionTupleSchemeFactory());
}
- public int code; // required
+ /**
+ *
+ * @see RemoteReplicationErrorCode
+ */
+ public RemoteReplicationErrorCode code; // required
public String reason; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ /**
+ *
+ * @see RemoteReplicationErrorCode
+ */
CODE((short)1, "code"),
REASON((short)2, "reason");
@@ -123,13 +131,11 @@ import org.slf4j.LoggerFactory;
}
// isset id assignments
- private static final int __CODE_ISSET_ID = 0;
- private byte __isset_bitfield = 0;
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.CODE, new org.apache.thrift.meta_data.FieldMetaData("code", org.apache.thrift.TFieldRequirementType.DEFAULT,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, RemoteReplicationErrorCode.class)));
tmpMap.put(_Fields.REASON, new org.apache.thrift.meta_data.FieldMetaData("reason", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
@@ -140,12 +146,11 @@ import org.slf4j.LoggerFactory;
}
public RemoteReplicationException(
- int code,
+ RemoteReplicationErrorCode code,
String reason)
{
this();
this.code = code;
- setCodeIsSet(true);
this.reason = reason;
}
@@ -153,8 +158,9 @@ import org.slf4j.LoggerFactory;
* Performs a deep copy on <i>other</i>.
*/
public RemoteReplicationException(RemoteReplicationException other) {
- __isset_bitfield = other.__isset_bitfield;
- this.code = other.code;
+ if (other.isSetCode()) {
+ this.code = other.code;
+ }
if (other.isSetReason()) {
this.reason = other.reason;
}
@@ -166,32 +172,40 @@ import org.slf4j.LoggerFactory;
@Override
public void clear() {
- setCodeIsSet(false);
- this.code = 0;
+ this.code = null;
this.reason = null;
}
- public int getCode() {
+ /**
+ *
+ * @see RemoteReplicationErrorCode
+ */
+ public RemoteReplicationErrorCode getCode() {
return this.code;
}
- public RemoteReplicationException setCode(int code) {
+ /**
+ *
+ * @see RemoteReplicationErrorCode
+ */
+ public RemoteReplicationException setCode(RemoteReplicationErrorCode code) {
this.code = code;
- setCodeIsSet(true);
return this;
}
public void unsetCode() {
- __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __CODE_ISSET_ID);
+ this.code = null;
}
/** Returns true if field code is set (has been assigned a value) and false otherwise */
public boolean isSetCode() {
- return EncodingUtils.testBit(__isset_bitfield, __CODE_ISSET_ID);
+ return this.code != null;
}
public void setCodeIsSet(boolean value) {
- __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __CODE_ISSET_ID, value);
+ if (!value) {
+ this.code = null;
+ }
}
public String getReason() {
@@ -224,7 +238,7 @@ import org.slf4j.LoggerFactory;
if (value == null) {
unsetCode();
} else {
- setCode((Integer)value);
+ setCode((RemoteReplicationErrorCode)value);
}
break;
@@ -242,7 +256,7 @@ import org.slf4j.LoggerFactory;
public Object getFieldValue(_Fields field) {
switch (field) {
case CODE:
- return Integer.valueOf(getCode());
+ return getCode();
case REASON:
return getReason();
@@ -279,12 +293,12 @@ import org.slf4j.LoggerFactory;
if (that == null)
return false;
- boolean this_present_code = true;
- boolean that_present_code = true;
+ boolean this_present_code = true && this.isSetCode();
+ boolean that_present_code = true && that.isSetCode();
if (this_present_code || that_present_code) {
if (!(this_present_code && that_present_code))
return false;
- if (this.code != that.code)
+ if (!this.code.equals(that.code))
return false;
}
@@ -354,7 +368,11 @@ import org.slf4j.LoggerFactory;
boolean first = true;
sb.append("code:");
- sb.append(this.code);
+ if (this.code == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.code);
+ }
first = false;
if (!first) sb.append(", ");
sb.append("reason:");
@@ -383,8 +401,6 @@ import org.slf4j.LoggerFactory;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -411,7 +427,7 @@ import org.slf4j.LoggerFactory;
switch (schemeField.id) {
case 1: // CODE
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.code = iprot.readI32();
+ struct.code = RemoteReplicationErrorCode.findByValue(iprot.readI32());
struct.setCodeIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
@@ -440,9 +456,11 @@ import org.slf4j.LoggerFactory;
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
- oprot.writeFieldBegin(CODE_FIELD_DESC);
- oprot.writeI32(struct.code);
- oprot.writeFieldEnd();
+ if (struct.code != null) {
+ oprot.writeFieldBegin(CODE_FIELD_DESC);
+ oprot.writeI32(struct.code.getValue());
+ oprot.writeFieldEnd();
+ }
if (struct.reason != null) {
oprot.writeFieldBegin(REASON_FIELD_DESC);
oprot.writeString(struct.reason);
@@ -474,7 +492,7 @@ import org.slf4j.LoggerFactory;
}
oprot.writeBitSet(optionals, 2);
if (struct.isSetCode()) {
- oprot.writeI32(struct.code);
+ oprot.writeI32(struct.code.getValue());
}
if (struct.isSetReason()) {
oprot.writeString(struct.reason);
@@ -486,7 +504,7 @@ import org.slf4j.LoggerFactory;
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
- struct.code = iprot.readI32();
+ struct.code = RemoteReplicationErrorCode.findByValue(iprot.readI32());
struct.setCodeIsSet(true);
}
if (incoming.get(1)) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinator.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinator.java
index ebc7c61..1ee9b91 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinator.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinator.java
@@ -50,13 +50,13 @@ import org.slf4j.LoggerFactory;
public interface Iface {
- public String getServicerAddress(int remoteTableId) throws RemoteCoordinationException, org.apache.thrift.TException;
+ public String getServicerAddress(int remoteTableId, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws ReplicationCoordinatorException, org.apache.thrift.TException;
}
public interface AsyncIface {
- public void getServicerAddress(int remoteTableId, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getServicerAddress_call> resultHandler) throws org.apache.thrift.TException;
+ public void getServicerAddress(int remoteTableId, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<AsyncClient.getServicerAddress_call> resultHandler) throws org.apache.thrift.TException;
}
@@ -80,20 +80,21 @@ import org.slf4j.LoggerFactory;
super(iprot, oprot);
}
- public String getServicerAddress(int remoteTableId) throws RemoteCoordinationException, org.apache.thrift.TException
+ public String getServicerAddress(int remoteTableId, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws ReplicationCoordinatorException, org.apache.thrift.TException
{
- send_getServicerAddress(remoteTableId);
+ send_getServicerAddress(remoteTableId, credentials);
return recv_getServicerAddress();
}
- public void send_getServicerAddress(int remoteTableId) throws org.apache.thrift.TException
+ public void send_getServicerAddress(int remoteTableId, org.apache.accumulo.core.security.thrift.TCredentials credentials) throws org.apache.thrift.TException
{
getServicerAddress_args args = new getServicerAddress_args();
args.setRemoteTableId(remoteTableId);
+ args.setCredentials(credentials);
sendBase("getServicerAddress", args);
}
- public String recv_getServicerAddress() throws RemoteCoordinationException, org.apache.thrift.TException
+ public String recv_getServicerAddress() throws ReplicationCoordinatorException, org.apache.thrift.TException
{
getServicerAddress_result result = new getServicerAddress_result();
receiveBase(result, "getServicerAddress");
@@ -124,29 +125,32 @@ import org.slf4j.LoggerFactory;
super(protocolFactory, clientManager, transport);
}
- public void getServicerAddress(int remoteTableId, org.apache.thrift.async.AsyncMethodCallback<getServicerAddress_call> resultHandler) throws org.apache.thrift.TException {
+ public void getServicerAddress(int remoteTableId, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<getServicerAddress_call> resultHandler) throws org.apache.thrift.TException {
checkReady();
- getServicerAddress_call method_call = new getServicerAddress_call(remoteTableId, resultHandler, this, ___protocolFactory, ___transport);
+ getServicerAddress_call method_call = new getServicerAddress_call(remoteTableId, credentials, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
public static class getServicerAddress_call extends org.apache.thrift.async.TAsyncMethodCall {
private int remoteTableId;
- public getServicerAddress_call(int remoteTableId, org.apache.thrift.async.AsyncMethodCallback<getServicerAddress_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private org.apache.accumulo.core.security.thrift.TCredentials credentials;
+ public getServicerAddress_call(int remoteTableId, org.apache.accumulo.core.security.thrift.TCredentials credentials, org.apache.thrift.async.AsyncMethodCallback<getServicerAddress_call> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.remoteTableId = remoteTableId;
+ this.credentials = credentials;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("getServicerAddress", org.apache.thrift.protocol.TMessageType.CALL, 0));
getServicerAddress_args args = new getServicerAddress_args();
args.setRemoteTableId(remoteTableId);
+ args.setCredentials(credentials);
args.write(prot);
prot.writeMessageEnd();
}
- public String getResult() throws RemoteCoordinationException, org.apache.thrift.TException {
+ public String getResult() throws ReplicationCoordinatorException, org.apache.thrift.TException {
if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
throw new IllegalStateException("Method call not finished!");
}
@@ -189,8 +193,8 @@ import org.slf4j.LoggerFactory;
public getServicerAddress_result getResult(I iface, getServicerAddress_args args) throws org.apache.thrift.TException {
getServicerAddress_result result = new getServicerAddress_result();
try {
- result.success = iface.getServicerAddress(args.remoteTableId);
- } catch (RemoteCoordinationException e) {
+ result.success = iface.getServicerAddress(args.remoteTableId, args.credentials);
+ } catch (ReplicationCoordinatorException e) {
result.e = e;
}
return result;
@@ -203,6 +207,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("getServicerAddress_args");
private static final org.apache.thrift.protocol.TField REMOTE_TABLE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("remoteTableId", org.apache.thrift.protocol.TType.I32, (short)1);
+ private static final org.apache.thrift.protocol.TField CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials", org.apache.thrift.protocol.TType.STRUCT, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -211,10 +216,12 @@ import org.slf4j.LoggerFactory;
}
public int remoteTableId; // required
+ public org.apache.accumulo.core.security.thrift.TCredentials credentials; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
- REMOTE_TABLE_ID((short)1, "remoteTableId");
+ REMOTE_TABLE_ID((short)1, "remoteTableId"),
+ CREDENTIALS((short)2, "credentials");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -231,6 +238,8 @@ import org.slf4j.LoggerFactory;
switch(fieldId) {
case 1: // REMOTE_TABLE_ID
return REMOTE_TABLE_ID;
+ case 2: // CREDENTIALS
+ return CREDENTIALS;
default:
return null;
}
@@ -278,6 +287,8 @@ import org.slf4j.LoggerFactory;
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.REMOTE_TABLE_ID, new org.apache.thrift.meta_data.FieldMetaData("remoteTableId", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ tmpMap.put(_Fields.CREDENTIALS, new org.apache.thrift.meta_data.FieldMetaData("credentials", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.accumulo.core.security.thrift.TCredentials.class)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(getServicerAddress_args.class, metaDataMap);
}
@@ -286,11 +297,13 @@ import org.slf4j.LoggerFactory;
}
public getServicerAddress_args(
- int remoteTableId)
+ int remoteTableId,
+ org.apache.accumulo.core.security.thrift.TCredentials credentials)
{
this();
this.remoteTableId = remoteTableId;
setRemoteTableIdIsSet(true);
+ this.credentials = credentials;
}
/**
@@ -299,6 +312,9 @@ import org.slf4j.LoggerFactory;
public getServicerAddress_args(getServicerAddress_args other) {
__isset_bitfield = other.__isset_bitfield;
this.remoteTableId = other.remoteTableId;
+ if (other.isSetCredentials()) {
+ this.credentials = new org.apache.accumulo.core.security.thrift.TCredentials(other.credentials);
+ }
}
public getServicerAddress_args deepCopy() {
@@ -309,6 +325,7 @@ import org.slf4j.LoggerFactory;
public void clear() {
setRemoteTableIdIsSet(false);
this.remoteTableId = 0;
+ this.credentials = null;
}
public int getRemoteTableId() {
@@ -334,6 +351,30 @@ import org.slf4j.LoggerFactory;
__isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __REMOTETABLEID_ISSET_ID, value);
}
+ public org.apache.accumulo.core.security.thrift.TCredentials getCredentials() {
+ return this.credentials;
+ }
+
+ public getServicerAddress_args setCredentials(org.apache.accumulo.core.security.thrift.TCredentials credentials) {
+ this.credentials = credentials;
+ return this;
+ }
+
+ public void unsetCredentials() {
+ this.credentials = null;
+ }
+
+ /** Returns true if field credentials is set (has been assigned a value) and false otherwise */
+ public boolean isSetCredentials() {
+ return this.credentials != null;
+ }
+
+ public void setCredentialsIsSet(boolean value) {
+ if (!value) {
+ this.credentials = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case REMOTE_TABLE_ID:
@@ -344,6 +385,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case CREDENTIALS:
+ if (value == null) {
+ unsetCredentials();
+ } else {
+ setCredentials((org.apache.accumulo.core.security.thrift.TCredentials)value);
+ }
+ break;
+
}
}
@@ -352,6 +401,9 @@ import org.slf4j.LoggerFactory;
case REMOTE_TABLE_ID:
return Integer.valueOf(getRemoteTableId());
+ case CREDENTIALS:
+ return getCredentials();
+
}
throw new IllegalStateException();
}
@@ -365,6 +417,8 @@ import org.slf4j.LoggerFactory;
switch (field) {
case REMOTE_TABLE_ID:
return isSetRemoteTableId();
+ case CREDENTIALS:
+ return isSetCredentials();
}
throw new IllegalStateException();
}
@@ -391,6 +445,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_credentials = true && this.isSetCredentials();
+ boolean that_present_credentials = true && that.isSetCredentials();
+ if (this_present_credentials || that_present_credentials) {
+ if (!(this_present_credentials && that_present_credentials))
+ return false;
+ if (!this.credentials.equals(that.credentials))
+ return false;
+ }
+
return true;
}
@@ -417,6 +480,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetCredentials()).compareTo(typedOther.isSetCredentials());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCredentials()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.credentials, typedOther.credentials);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -440,6 +513,14 @@ import org.slf4j.LoggerFactory;
sb.append("remoteTableId:");
sb.append(this.remoteTableId);
first = false;
+ if (!first) sb.append(", ");
+ sb.append("credentials:");
+ if (this.credentials == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.credentials);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -447,6 +528,9 @@ import org.slf4j.LoggerFactory;
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
+ if (credentials != null) {
+ credentials.validate();
+ }
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -493,6 +577,15 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 2: // CREDENTIALS
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+ struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+ struct.credentials.read(iprot);
+ struct.setCredentialsIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -511,6 +604,11 @@ import org.slf4j.LoggerFactory;
oprot.writeFieldBegin(REMOTE_TABLE_ID_FIELD_DESC);
oprot.writeI32(struct.remoteTableId);
oprot.writeFieldEnd();
+ if (struct.credentials != null) {
+ oprot.writeFieldBegin(CREDENTIALS_FIELD_DESC);
+ struct.credentials.write(oprot);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -532,20 +630,31 @@ import org.slf4j.LoggerFactory;
if (struct.isSetRemoteTableId()) {
optionals.set(0);
}
- oprot.writeBitSet(optionals, 1);
+ if (struct.isSetCredentials()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
if (struct.isSetRemoteTableId()) {
oprot.writeI32(struct.remoteTableId);
}
+ if (struct.isSetCredentials()) {
+ struct.credentials.write(oprot);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, getServicerAddress_args struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
- BitSet incoming = iprot.readBitSet(1);
+ BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
struct.remoteTableId = iprot.readI32();
struct.setRemoteTableIdIsSet(true);
}
+ if (incoming.get(1)) {
+ struct.credentials = new org.apache.accumulo.core.security.thrift.TCredentials();
+ struct.credentials.read(iprot);
+ struct.setCredentialsIsSet(true);
+ }
}
}
@@ -564,7 +673,7 @@ import org.slf4j.LoggerFactory;
}
public String success; // required
- public RemoteCoordinationException e; // required
+ public ReplicationCoordinatorException e; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -644,7 +753,7 @@ import org.slf4j.LoggerFactory;
public getServicerAddress_result(
String success,
- RemoteCoordinationException e)
+ ReplicationCoordinatorException e)
{
this();
this.success = success;
@@ -659,7 +768,7 @@ import org.slf4j.LoggerFactory;
this.success = other.success;
}
if (other.isSetE()) {
- this.e = new RemoteCoordinationException(other.e);
+ this.e = new ReplicationCoordinatorException(other.e);
}
}
@@ -697,11 +806,11 @@ import org.slf4j.LoggerFactory;
}
}
- public RemoteCoordinationException getE() {
+ public ReplicationCoordinatorException getE() {
return this.e;
}
- public getServicerAddress_result setE(RemoteCoordinationException e) {
+ public getServicerAddress_result setE(ReplicationCoordinatorException e) {
this.e = e;
return this;
}
@@ -735,7 +844,7 @@ import org.slf4j.LoggerFactory;
if (value == null) {
unsetE();
} else {
- setE((RemoteCoordinationException)value);
+ setE((ReplicationCoordinatorException)value);
}
break;
@@ -924,7 +1033,7 @@ import org.slf4j.LoggerFactory;
break;
case 1: // E
if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
- struct.e = new RemoteCoordinationException();
+ struct.e = new ReplicationCoordinatorException();
struct.e.read(iprot);
struct.setEIsSet(true);
} else {
@@ -998,7 +1107,7 @@ import org.slf4j.LoggerFactory;
struct.setSuccessIsSet(true);
}
if (incoming.get(1)) {
- struct.e = new RemoteCoordinationException();
+ struct.e = new ReplicationCoordinatorException();
struct.e.read(iprot);
struct.setEIsSet(true);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinatorErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinatorErrorCode.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinatorErrorCode.java
new file mode 100644
index 0000000..fb71ce5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinatorErrorCode.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.accumulo.core.replication.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+@SuppressWarnings("all") public enum ReplicationCoordinatorErrorCode implements org.apache.thrift.TEnum {
+ NO_AVAILABLE_SERVERS(0),
+ SERVICE_CONFIGURATION_UNAVAILABLE(1),
+ CANNOT_AUTHENTICATE(2);
+
+ private final int value;
+
+ private ReplicationCoordinatorErrorCode(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ public static ReplicationCoordinatorErrorCode findByValue(int value) {
+ switch (value) {
+ case 0:
+ return NO_AVAILABLE_SERVERS;
+ case 1:
+ return SERVICE_CONFIGURATION_UNAVAILABLE;
+ case 2:
+ return CANNOT_AUTHENTICATE;
+ default:
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b3ef383d/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinatorException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinatorException.java b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinatorException.java
new file mode 100644
index 0000000..d7a74e8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/replication/thrift/ReplicationCoordinatorException.java
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package org.apache.accumulo.core.replication.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class ReplicationCoordinatorException extends TException implements org.apache.thrift.TBase<ReplicationCoordinatorException, ReplicationCoordinatorException._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ReplicationCoordinatorException");
+
+ private static final org.apache.thrift.protocol.TField CODE_FIELD_DESC = new org.apache.thrift.protocol.TField("code", org.apache.thrift.protocol.TType.I32, (short)1);
+ private static final org.apache.thrift.protocol.TField REASON_FIELD_DESC = new org.apache.thrift.protocol.TField("reason", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+ static {
+ schemes.put(StandardScheme.class, new ReplicationCoordinatorExceptionStandardSchemeFactory());
+ schemes.put(TupleScheme.class, new ReplicationCoordinatorExceptionTupleSchemeFactory());
+ }
+
+ /**
+ *
+ * @see ReplicationCoordinatorErrorCode
+ */
+ public ReplicationCoordinatorErrorCode code; // required
+ public String reason; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ /**
+ *
+ * @see ReplicationCoordinatorErrorCode
+ */
+ CODE((short)1, "code"),
+ REASON((short)2, "reason");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // CODE
+ return CODE;
+ case 2: // REASON
+ return REASON;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.CODE, new org.apache.thrift.meta_data.FieldMetaData("code", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, ReplicationCoordinatorErrorCode.class)));
+ tmpMap.put(_Fields.REASON, new org.apache.thrift.meta_data.FieldMetaData("reason", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ReplicationCoordinatorException.class, metaDataMap);
+ }
+
+ public ReplicationCoordinatorException() {
+ }
+
+ public ReplicationCoordinatorException(
+ ReplicationCoordinatorErrorCode code,
+ String reason)
+ {
+ this();
+ this.code = code;
+ this.reason = reason;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public ReplicationCoordinatorException(ReplicationCoordinatorException other) {
+ if (other.isSetCode()) {
+ this.code = other.code;
+ }
+ if (other.isSetReason()) {
+ this.reason = other.reason;
+ }
+ }
+
+ public ReplicationCoordinatorException deepCopy() {
+ return new ReplicationCoordinatorException(this);
+ }
+
+ @Override
+ public void clear() {
+ this.code = null;
+ this.reason = null;
+ }
+
+ /**
+ *
+ * @see ReplicationCoordinatorErrorCode
+ */
+ public ReplicationCoordinatorErrorCode getCode() {
+ return this.code;
+ }
+
+ /**
+ *
+ * @see ReplicationCoordinatorErrorCode
+ */
+ public ReplicationCoordinatorException setCode(ReplicationCoordinatorErrorCode code) {
+ this.code = code;
+ return this;
+ }
+
+ public void unsetCode() {
+ this.code = null;
+ }
+
+ /** Returns true if field code is set (has been assigned a value) and false otherwise */
+ public boolean isSetCode() {
+ return this.code != null;
+ }
+
+ public void setCodeIsSet(boolean value) {
+ if (!value) {
+ this.code = null;
+ }
+ }
+
+ public String getReason() {
+ return this.reason;
+ }
+
+ public ReplicationCoordinatorException setReason(String reason) {
+ this.reason = reason;
+ return this;
+ }
+
+ public void unsetReason() {
+ this.reason = null;
+ }
+
+ /** Returns true if field reason is set (has been assigned a value) and false otherwise */
+ public boolean isSetReason() {
+ return this.reason != null;
+ }
+
+ public void setReasonIsSet(boolean value) {
+ if (!value) {
+ this.reason = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case CODE:
+ if (value == null) {
+ unsetCode();
+ } else {
+ setCode((ReplicationCoordinatorErrorCode)value);
+ }
+ break;
+
+ case REASON:
+ if (value == null) {
+ unsetReason();
+ } else {
+ setReason((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case CODE:
+ return getCode();
+
+ case REASON:
+ return getReason();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case CODE:
+ return isSetCode();
+ case REASON:
+ return isSetReason();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof ReplicationCoordinatorException)
+ return this.equals((ReplicationCoordinatorException)that);
+ return false;
+ }
+
+ public boolean equals(ReplicationCoordinatorException that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_code = true && this.isSetCode();
+ boolean that_present_code = true && that.isSetCode();
+ if (this_present_code || that_present_code) {
+ if (!(this_present_code && that_present_code))
+ return false;
+ if (!this.code.equals(that.code))
+ return false;
+ }
+
+ boolean this_present_reason = true && this.isSetReason();
+ boolean that_present_reason = true && that.isSetReason();
+ if (this_present_reason || that_present_reason) {
+ if (!(this_present_reason && that_present_reason))
+ return false;
+ if (!this.reason.equals(that.reason))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ public int compareTo(ReplicationCoordinatorException other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ ReplicationCoordinatorException typedOther = (ReplicationCoordinatorException)other;
+
+ lastComparison = Boolean.valueOf(isSetCode()).compareTo(typedOther.isSetCode());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCode()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.code, typedOther.code);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(isSetReason()).compareTo(typedOther.isSetReason());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetReason()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.reason, typedOther.reason);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("ReplicationCoordinatorException(");
+ boolean first = true;
+
+ sb.append("code:");
+ if (this.code == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.code);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("reason:");
+ if (this.reason == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.reason);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class ReplicationCoordinatorExceptionStandardSchemeFactory implements SchemeFactory {
+ public ReplicationCoordinatorExceptionStandardScheme getScheme() {
+ return new ReplicationCoordinatorExceptionStandardScheme();
+ }
+ }
+
+ private static class ReplicationCoordinatorExceptionStandardScheme extends StandardScheme<ReplicationCoordinatorException> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, ReplicationCoordinatorException struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // CODE
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.code = ReplicationCoordinatorErrorCode.findByValue(iprot.readI32());
+ struct.setCodeIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // REASON
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.reason = iprot.readString();
+ struct.setReasonIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, ReplicationCoordinatorException struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.code != null) {
+ oprot.writeFieldBegin(CODE_FIELD_DESC);
+ oprot.writeI32(struct.code.getValue());
+ oprot.writeFieldEnd();
+ }
+ if (struct.reason != null) {
+ oprot.writeFieldBegin(REASON_FIELD_DESC);
+ oprot.writeString(struct.reason);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class ReplicationCoordinatorExceptionTupleSchemeFactory implements SchemeFactory {
+ public ReplicationCoordinatorExceptionTupleScheme getScheme() {
+ return new ReplicationCoordinatorExceptionTupleScheme();
+ }
+ }
+
+ private static class ReplicationCoordinatorExceptionTupleScheme extends TupleScheme<ReplicationCoordinatorException> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, ReplicationCoordinatorException struct) throws org.apache.thrift.TException {
+ TTupleProtocol oprot = (TTupleProtocol) prot;
+ BitSet optionals = new BitSet();
+ if (struct.isSetCode()) {
+ optionals.set(0);
+ }
+ if (struct.isSetReason()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetCode()) {
+ oprot.writeI32(struct.code.getValue());
+ }
+ if (struct.isSetReason()) {
+ oprot.writeString(struct.reason);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, ReplicationCoordinatorException struct) throws org.apache.thrift.TException {
+ TTupleProtocol iprot = (TTupleProtocol) prot;
+ BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.code = ReplicationCoordinatorErrorCode.findByValue(iprot.readI32());
+ struct.setCodeIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.reason = iprot.readString();
+ struct.setReasonIsSet(true);
+ }
+ }
+ }
+
+}
+