You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 04:00:08 UTC
[49/50] [abbrv] git commit: ACCUMULO-2574 Change closedTime into
createdTime
ACCUMULO-2574 Change closedTime into createdTime
It's rather impossible to actually konw the exact point in time when
a WAL is no longer in use by any tablets. We can know when a tablet is
done with a WAL, but many tablets may be referencing it which convolutes
things.
Instead, we can get the same intent (global ordering of WALs for a tserver)
by recording the time in which the WAL was first created as this is something
that only occurs once in the lifetime of a file.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b53bbf08
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b53bbf08
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b53bbf08
Branch: refs/heads/ACCUMULO-378
Commit: b53bbf08a35cf2ad82c9638fd81a67a671da71ab
Parents: 53e06a2
Author: Josh Elser <el...@apache.org>
Authored: Tue May 20 18:49:49 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 20 18:49:49 2014 -0400
----------------------------------------------------------------------
.../accumulo/core/replication/StatusUtil.java | 74 ++--
.../core/replication/proto/Replication.java | 94 ++---
core/src/main/protobuf/replication.proto | 2 +-
.../core/replication/StatusUtilTest.java | 2 +-
.../apache/accumulo/server/fs/VolumeUtil.java | 2 +-
.../server/replication/StatusCombiner.java | 20 +-
.../server/replication/StatusCombinerTest.java | 32 +-
.../server/util/ReplicationTableUtilTest.java | 7 +-
.../CloseWriteAheadLogReferences.java | 2 +-
.../gc/GarbageCollectWriteAheadLogsTest.java | 23 +-
.../accumulo/gc/GarbageCollectionTest.java | 2 +-
.../CloseWriteAheadLogReferencesTest.java | 4 +-
.../RemoveCompleteReplicationRecords.java | 15 +-
.../master/replication/StatusMaker.java | 7 +-
.../DistributedWorkQueueWorkAssignerTest.java | 4 +-
.../RemoveCompleteReplicationRecordsTest.java | 6 +-
.../replication/SequentialWorkAssignerTest.java | 32 +-
.../master/replication/StatusMakerTest.java | 18 +-
.../master/replication/WorkMakerTest.java | 16 +-
.../org/apache/accumulo/tserver/Tablet.java | 2 +-
.../tserver/log/TabletServerLogger.java | 6 +-
.../test/replication/ReplicationWithGCIT.java | 341 ++++++++++---------
.../replication/ReplicationWithMakerTest.java | 12 +-
.../test/replication/StatusCombinerMacTest.java | 5 +-
test/src/test/resources/log4j.properties | 1 +
25 files changed, 385 insertions(+), 344 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
index 94c60ab..721f1e0 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
@@ -28,47 +28,33 @@ import com.google.protobuf.InvalidProtocolBufferException;
*/
public class StatusUtil {
- private static final Status NEW_REPLICATION_STATUS, INF_END_REPLICATION_STATUS;
- private static final Value NEW_REPLICATION_STATUS_VALUE, INF_END_REPLICATION_STATUS_VALUE;
+ private static final Status INF_END_REPLICATION_STATUS, CLOSED_STATUS;
+ private static final Value INF_END_REPLICATION_STATUS_VALUE, CLOSED_STATUS_VALUE;
- private static final Status.Builder CLOSED_STATUS_BUILDER;
+ private static final Status.Builder CREATED_STATUS_BUILDER;
static {
- Status.Builder builder = Status.newBuilder();
- builder.setBegin(0);
- builder.setEnd(0);
- builder.setInfiniteEnd(false);
- builder.setClosed(false);
- NEW_REPLICATION_STATUS = builder.build();
- NEW_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(NEW_REPLICATION_STATUS);
+ CREATED_STATUS_BUILDER = Status.newBuilder();
+ CREATED_STATUS_BUILDER.setBegin(0);
+ CREATED_STATUS_BUILDER.setEnd(0);
+ CREATED_STATUS_BUILDER.setInfiniteEnd(false);
+ CREATED_STATUS_BUILDER.setClosed(false);
- CLOSED_STATUS_BUILDER = Status.newBuilder();
- CLOSED_STATUS_BUILDER.setBegin(0);
- CLOSED_STATUS_BUILDER.setEnd(0);
- CLOSED_STATUS_BUILDER.setInfiniteEnd(true);
- CLOSED_STATUS_BUILDER.setClosed(true);
-
- builder = Status.newBuilder();
+ Builder builder = Status.newBuilder();
builder.setBegin(0);
builder.setEnd(0);
builder.setInfiniteEnd(true);
builder.setClosed(false);
INF_END_REPLICATION_STATUS = builder.build();
INF_END_REPLICATION_STATUS_VALUE = ProtobufUtil.toValue(INF_END_REPLICATION_STATUS);
- }
- /**
- * @return A {@link Status} which represents a file with no data that is open for writes
- */
- public static Status newFile() {
- return NEW_REPLICATION_STATUS;
- }
-
- /**
- * @return A {@link Value} which represent a file with no data that is open for writes
- */
- public static Value newFileValue() {
- return NEW_REPLICATION_STATUS_VALUE;
+ builder = Status.newBuilder();
+ builder.setBegin(0);
+ builder.setEnd(0);
+ builder.setInfiniteEnd(true);
+ builder.setClosed(true);
+ CLOSED_STATUS = builder.build();
+ CLOSED_STATUS_VALUE = ProtobufUtil.toValue(CLOSED_STATUS);
}
/**
@@ -135,19 +121,33 @@ public class StatusUtil {
}
/**
- * @return A {@link Status} for a closed file of unspecified length, all of which needs replicating.
+ * @return A {@link Status} for a new file that was just created
*/
- public static synchronized Status fileClosed(long timeClosed) {
+ public static synchronized Status fileCreated(long timeCreated) {
// We're using a shared builder, so we need to synchronize access on it until we make a Status (which is then immutable)
- CLOSED_STATUS_BUILDER.setClosedTime(timeClosed);
- return CLOSED_STATUS_BUILDER.build();
+ CREATED_STATUS_BUILDER.setCreatedTime(timeCreated);
+ return CREATED_STATUS_BUILDER.build();
+ }
+
+ /**
+ * @return A {@link Value} for a new file that was just created
+ */
+ public static Value fileCreatedValue(long timeCreated) {
+ return ProtobufUtil.toValue(fileCreated(timeCreated));
+ }
+
+ /**
+ * @return A Status representing a closed file
+ */
+ public static Status fileClosed() {
+ return CLOSED_STATUS;
}
/**
- * @return A {@link Value} for a closed file of unspecified length, all of which needs replicating.
+ * @return A Value representing a closed file
*/
- public static Value fileClosedValue(long timeClosed) {
- return ProtobufUtil.toValue(fileClosed(timeClosed));
+ public static Value fileClosedValue() {
+ return CLOSED_STATUS_VALUE;
}
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java b/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
index d301028..2bff020 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/proto/Replication.java
@@ -99,23 +99,23 @@ package org.apache.accumulo.core.replication.proto;
*/
boolean getClosed();
- // optional int64 closedTime = 5 [default = 0];
+ // optional int64 createdTime = 5 [default = 0];
/**
- * <code>optional int64 closedTime = 5 [default = 0];</code>
+ * <code>optional int64 createdTime = 5 [default = 0];</code>
*
* <pre>
- * when, in ms, was the file closed?
+ * when, in ms, was the file created?
* </pre>
*/
- boolean hasClosedTime();
+ boolean hasCreatedTime();
/**
- * <code>optional int64 closedTime = 5 [default = 0];</code>
+ * <code>optional int64 createdTime = 5 [default = 0];</code>
*
* <pre>
- * when, in ms, was the file closed?
+ * when, in ms, was the file created?
* </pre>
*/
- long getClosedTime();
+ long getCreatedTime();
}
/**
* Protobuf type {@code Status}
@@ -190,7 +190,7 @@ package org.apache.accumulo.core.replication.proto;
}
case 40: {
bitField0_ |= 0x00000010;
- closedTime_ = input.readInt64();
+ createdTime_ = input.readInt64();
break;
}
}
@@ -329,28 +329,28 @@ package org.apache.accumulo.core.replication.proto;
return closed_;
}
- // optional int64 closedTime = 5 [default = 0];
- public static final int CLOSEDTIME_FIELD_NUMBER = 5;
- private long closedTime_;
+ // optional int64 createdTime = 5 [default = 0];
+ public static final int CREATEDTIME_FIELD_NUMBER = 5;
+ private long createdTime_;
/**
- * <code>optional int64 closedTime = 5 [default = 0];</code>
+ * <code>optional int64 createdTime = 5 [default = 0];</code>
*
* <pre>
- * when, in ms, was the file closed?
+ * when, in ms, was the file created?
* </pre>
*/
- public boolean hasClosedTime() {
+ public boolean hasCreatedTime() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
- * <code>optional int64 closedTime = 5 [default = 0];</code>
+ * <code>optional int64 createdTime = 5 [default = 0];</code>
*
* <pre>
- * when, in ms, was the file closed?
+ * when, in ms, was the file created?
* </pre>
*/
- public long getClosedTime() {
- return closedTime_;
+ public long getCreatedTime() {
+ return createdTime_;
}
private void initFields() {
@@ -358,7 +358,7 @@ package org.apache.accumulo.core.replication.proto;
end_ = 0L;
infiniteEnd_ = false;
closed_ = false;
- closedTime_ = 0L;
+ createdTime_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -385,7 +385,7 @@ package org.apache.accumulo.core.replication.proto;
output.writeBool(4, closed_);
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
- output.writeInt64(5, closedTime_);
+ output.writeInt64(5, createdTime_);
}
getUnknownFields().writeTo(output);
}
@@ -414,7 +414,7 @@ package org.apache.accumulo.core.replication.proto;
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
size += com.google.protobuf.CodedOutputStream
- .computeInt64Size(5, closedTime_);
+ .computeInt64Size(5, createdTime_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
@@ -540,7 +540,7 @@ package org.apache.accumulo.core.replication.proto;
bitField0_ = (bitField0_ & ~0x00000004);
closed_ = false;
bitField0_ = (bitField0_ & ~0x00000008);
- closedTime_ = 0L;
+ createdTime_ = 0L;
bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -589,7 +589,7 @@ package org.apache.accumulo.core.replication.proto;
if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
to_bitField0_ |= 0x00000010;
}
- result.closedTime_ = closedTime_;
+ result.createdTime_ = createdTime_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -618,8 +618,8 @@ package org.apache.accumulo.core.replication.proto;
if (other.hasClosed()) {
setClosed(other.getClosed());
}
- if (other.hasClosedTime()) {
- setClosedTime(other.getClosedTime());
+ if (other.hasCreatedTime()) {
+ setCreatedTime(other.getCreatedTime());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
@@ -844,51 +844,51 @@ package org.apache.accumulo.core.replication.proto;
return this;
}
- // optional int64 closedTime = 5 [default = 0];
- private long closedTime_ ;
+ // optional int64 createdTime = 5 [default = 0];
+ private long createdTime_ ;
/**
- * <code>optional int64 closedTime = 5 [default = 0];</code>
+ * <code>optional int64 createdTime = 5 [default = 0];</code>
*
* <pre>
- * when, in ms, was the file closed?
+ * when, in ms, was the file created?
* </pre>
*/
- public boolean hasClosedTime() {
+ public boolean hasCreatedTime() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
/**
- * <code>optional int64 closedTime = 5 [default = 0];</code>
+ * <code>optional int64 createdTime = 5 [default = 0];</code>
*
* <pre>
- * when, in ms, was the file closed?
+ * when, in ms, was the file created?
* </pre>
*/
- public long getClosedTime() {
- return closedTime_;
+ public long getCreatedTime() {
+ return createdTime_;
}
/**
- * <code>optional int64 closedTime = 5 [default = 0];</code>
+ * <code>optional int64 createdTime = 5 [default = 0];</code>
*
* <pre>
- * when, in ms, was the file closed?
+ * when, in ms, was the file created?
* </pre>
*/
- public Builder setClosedTime(long value) {
+ public Builder setCreatedTime(long value) {
bitField0_ |= 0x00000010;
- closedTime_ = value;
+ createdTime_ = value;
onChanged();
return this;
}
/**
- * <code>optional int64 closedTime = 5 [default = 0];</code>
+ * <code>optional int64 createdTime = 5 [default = 0];</code>
*
* <pre>
- * when, in ms, was the file closed?
+ * when, in ms, was the file created?
* </pre>
*/
- public Builder clearClosedTime() {
+ public Builder clearCreatedTime() {
bitField0_ = (bitField0_ & ~0x00000010);
- closedTime_ = 0L;
+ createdTime_ = 0L;
onChanged();
return this;
}
@@ -918,12 +918,12 @@ package org.apache.accumulo.core.replication.proto;
descriptor;
static {
java.lang.String[] descriptorData = {
- "\n#src/main/protobuf/replication.proto\"t\n" +
+ "\n#src/main/protobuf/replication.proto\"u\n" +
"\006Status\022\020\n\005begin\030\001 \001(\003:\0010\022\016\n\003end\030\002 \001(\003:\001" +
"0\022\032\n\013infiniteEnd\030\003 \001(\010:\005false\022\025\n\006closed\030" +
- "\004 \001(\010:\005false\022\025\n\nclosedTime\030\005 \001(\003:\0010B.\n*o" +
- "rg.apache.accumulo.core.replication.prot" +
- "oH\001"
+ "\004 \001(\010:\005false\022\026\n\013createdTime\030\005 \001(\003:\0010B.\n*" +
+ "org.apache.accumulo.core.replication.pro" +
+ "toH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -935,7 +935,7 @@ package org.apache.accumulo.core.replication.proto;
internal_static_Status_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Status_descriptor,
- new java.lang.String[] { "Begin", "End", "InfiniteEnd", "Closed", "ClosedTime", });
+ new java.lang.String[] { "Begin", "End", "InfiniteEnd", "Closed", "CreatedTime", });
return null;
}
};
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/core/src/main/protobuf/replication.proto
----------------------------------------------------------------------
diff --git a/core/src/main/protobuf/replication.proto b/core/src/main/protobuf/replication.proto
index be801b0..7feda58 100644
--- a/core/src/main/protobuf/replication.proto
+++ b/core/src/main/protobuf/replication.proto
@@ -22,5 +22,5 @@ message Status {
optional int64 end = 2 [default = 0]; // offset where data is ready for replication
optional bool infiniteEnd = 3 [default = false]; // do we have a discrete 'end'
optional bool closed = 4 [default = false]; // will more data be appended to the file
- optional int64 closedTime = 5 [default = 0]; // when, in ms, was the file closed?
+ optional int64 createdTime = 5 [default = 0]; // when, in ms, was the file created?
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java b/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java
index 14609f3..5a35d65 100644
--- a/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/replication/StatusUtilTest.java
@@ -27,7 +27,7 @@ public class StatusUtilTest {
@Test
public void newFileIsNotCompletelyReplicated() {
- Assert.assertFalse(StatusUtil.isSafeForRemoval(StatusUtil.newFile()));
+ Assert.assertFalse(StatusUtil.isSafeForRemoval(StatusUtil.fileCreated(0l)));
}
@Test
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
index f9d43f1..0472fa9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/VolumeUtil.java
@@ -233,7 +233,7 @@ public class VolumeUtil {
Credentials creds = SystemCredentials.get();
MetadataTableUtil.updateTabletVolumes(extent, logsToRemove, logsToAdd, filesToRemove, filesToAdd, switchedDir, zooLock, creds);
if (replicate) {
- Status status = StatusUtil.fileClosed(System.currentTimeMillis());
+ Status status = StatusUtil.fileClosed();
log.debug("Tablet directory switched, need to record old log files " + logsToRemove + " " + ProtobufUtil.toString(status));
// Before deleting these logs, we need to mark them for replication
ReplicationTableUtil.updateLogs(creds, extent, logsToRemove, status);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java
index 694664e..ecca99e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/replication/StatusCombiner.java
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.TypedValueCombiner;
import org.apache.accumulo.core.iterators.ValueFormatException;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.replication.proto.Replication.Status.Builder;
import org.apache.log4j.Logger;
@@ -91,7 +92,7 @@ public class StatusCombiner extends TypedValueCombiner<Status> {
if (null == combined) {
if (!iter.hasNext()) {
if (log.isTraceEnabled()) {
- log.trace("Returned single value: " + key.toStringNoTruncate() + " " + status.toString().replace("\n", ", "));
+ log.trace("Returned single value: " + key.toStringNoTruncate() + " " + ProtobufUtil.toString(status));
}
return status;
} else {
@@ -104,7 +105,7 @@ public class StatusCombiner extends TypedValueCombiner<Status> {
}
if (log.isTraceEnabled()) {
- log.trace("Combined: " + key.toStringNoTruncate() + " " + combined.build().toString().replace("\n", ", "));
+ log.trace("Combined: " + key.toStringNoTruncate() + " " + ProtobufUtil.toString(combined.build()));
}
return combined.build();
@@ -135,18 +136,19 @@ public class StatusCombiner extends TypedValueCombiner<Status> {
// persist the infinite end
combined.setInfiniteEnd(combined.getInfiniteEnd() | status.getInfiniteEnd());
- // only set the closedTime if the new status has it defined
- if (status.hasClosedTime()) {
- // choose the minimum (earliest) closedTime seen
- if (combined.hasClosedTime()) {
- combined.setClosedTime(Math.min(combined.getClosedTime(), status.getClosedTime()));
+ // only set the createdTime if the new status has it defined
+ if (status.hasCreatedTime()) {
+ // choose the minimum (earliest) createdTime seen
+ if (combined.hasCreatedTime()) {
+ combined.setCreatedTime(Math.min(combined.getCreatedTime(), status.getCreatedTime()));
} else {
- combined.setClosedTime(status.getClosedTime());
+ combined.setCreatedTime(status.getCreatedTime());
}
}
}
private String builderToString(Builder builder) {
- return "begin: " + builder.getBegin() + ", end: " + builder.getEnd() + ", infiniteEnd: " + builder.getInfiniteEnd() + ", closed: "+ builder.getClosed();
+ return "begin: " + builder.getBegin() + ", end: " + builder.getEnd() + ", infiniteEnd: " + builder.getInfiniteEnd() + ", closed: " + builder.getClosed()
+ + ", createdTime: " + builder.getCreatedTime();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
index d74e2c6..5bc2488 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/replication/StatusCombinerTest.java
@@ -86,27 +86,29 @@ public class StatusCombinerTest {
@Test
public void newStatusWithNewIngest() {
- Status orig = StatusUtil.newFile();
+ Status orig = StatusUtil.fileCreated(100);
Status status = StatusUtil.replicatedAndIngested(10, 20);
Status ret = combiner.typedReduce(key, Arrays.asList(orig, status).iterator());
Assert.assertEquals(10l, ret.getBegin());
Assert.assertEquals(20l, ret.getEnd());
+ Assert.assertEquals(100l, ret.getCreatedTime());
Assert.assertEquals(false, ret.getClosed());
}
@Test
public void newStatusWithNewIngestSingleBuilder() {
- Status orig = StatusUtil.newFile();
+ Status orig = StatusUtil.fileCreated(100);
Status status = StatusUtil.replicatedAndIngested(builder, 10, 20);
Status ret = combiner.typedReduce(key, Arrays.asList(orig, status).iterator());
Assert.assertEquals(10l, ret.getBegin());
Assert.assertEquals(20l, ret.getEnd());
+ Assert.assertEquals(100l, ret.getCreatedTime());
Assert.assertEquals(false, ret.getClosed());
}
@Test
public void commutativeNewFile() {
- Status newFile = StatusUtil.newFile(), firstSync = StatusUtil.ingestedUntil(100), secondSync = StatusUtil.ingestedUntil(200);
+ Status newFile = StatusUtil.fileCreated(100), firstSync = StatusUtil.ingestedUntil(100), secondSync = StatusUtil.ingestedUntil(200);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, firstSync, secondSync).iterator()), order2 = combiner.typedReduce(key,
Arrays.asList(secondSync, firstSync, newFile).iterator());
@@ -116,7 +118,7 @@ public class StatusCombinerTest {
@Test
public void commutativeNewFileSingleBuilder() {
- Status newFile = StatusUtil.newFile(), firstSync = StatusUtil.ingestedUntil(builder, 100), secondSync = StatusUtil.ingestedUntil(builder, 200);
+ Status newFile = StatusUtil.fileCreated(100), firstSync = StatusUtil.ingestedUntil(builder, 100), secondSync = StatusUtil.ingestedUntil(builder, 200);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, firstSync, secondSync).iterator()), order2 = combiner.typedReduce(key,
Arrays.asList(secondSync, firstSync, newFile).iterator());
@@ -126,7 +128,7 @@ public class StatusCombinerTest {
@Test
public void commutativeNewUpdates() {
- Status newFile = StatusUtil.newFile(), firstSync = StatusUtil.ingestedUntil(100), secondSync = StatusUtil.ingestedUntil(200);
+ Status newFile = StatusUtil.fileCreated(100), firstSync = StatusUtil.ingestedUntil(100), secondSync = StatusUtil.ingestedUntil(200);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, firstSync, secondSync).iterator()), order2 = combiner.typedReduce(key,
Arrays.asList(newFile, secondSync, firstSync).iterator());
@@ -136,7 +138,7 @@ public class StatusCombinerTest {
@Test
public void commutativeNewUpdatesSingleBuilder() {
- Status newFile = StatusUtil.newFile(), firstSync = StatusUtil.ingestedUntil(builder, 100), secondSync = StatusUtil.ingestedUntil(builder, 200);
+ Status newFile = StatusUtil.fileCreated(100), firstSync = StatusUtil.ingestedUntil(builder, 100), secondSync = StatusUtil.ingestedUntil(builder, 200);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, firstSync, secondSync).iterator()), order2 = combiner.typedReduce(key,
Arrays.asList(newFile, secondSync, firstSync).iterator());
@@ -146,7 +148,7 @@ public class StatusCombinerTest {
@Test
public void commutativeWithClose() {
- Status newFile = StatusUtil.newFile(), closed = StatusUtil.fileClosed(System.currentTimeMillis()), secondSync = StatusUtil.ingestedUntil(200);
+ Status newFile = StatusUtil.fileCreated(100), closed = StatusUtil.fileClosed(), secondSync = StatusUtil.ingestedUntil(200);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, closed, secondSync).iterator()), order2 = combiner.typedReduce(key,
Arrays.asList(newFile, secondSync, closed).iterator());
@@ -156,7 +158,7 @@ public class StatusCombinerTest {
@Test
public void commutativeWithCloseSingleBuilder() {
- Status newFile = StatusUtil.newFile(), closed = StatusUtil.fileClosed(System.currentTimeMillis()), secondSync = StatusUtil.ingestedUntil(builder, 200);
+ Status newFile = StatusUtil.fileCreated(100), closed = StatusUtil.fileClosed(), secondSync = StatusUtil.ingestedUntil(builder, 200);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, closed, secondSync).iterator()), order2 = combiner.typedReduce(key,
Arrays.asList(newFile, secondSync, closed).iterator());
@@ -166,7 +168,7 @@ public class StatusCombinerTest {
@Test
public void commutativeWithMultipleUpdates() {
- Status newFile = StatusUtil.newFile(), update1 = StatusUtil.ingestedUntil(100), update2 = StatusUtil.ingestedUntil(200), repl1 = StatusUtil.replicated(50), repl2 = StatusUtil
+ Status newFile = StatusUtil.fileCreated(100), update1 = StatusUtil.ingestedUntil(100), update2 = StatusUtil.ingestedUntil(200), repl1 = StatusUtil.replicated(50), repl2 = StatusUtil
.replicated(150);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, update1, repl1, update2, repl2).iterator());
@@ -189,7 +191,7 @@ public class StatusCombinerTest {
@Test
public void commutativeWithMultipleUpdatesSingleBuilder() {
- Status newFile = StatusUtil.newFile(), update1 = StatusUtil.ingestedUntil(builder, 100), update2 = StatusUtil.ingestedUntil(builder, 200), repl1 = StatusUtil
+ Status newFile = StatusUtil.fileCreated(100), update1 = StatusUtil.ingestedUntil(builder, 100), update2 = StatusUtil.ingestedUntil(builder, 200), repl1 = StatusUtil
.replicated(builder, 50), repl2 = StatusUtil.replicated(builder, 150);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, update1, repl1, update2, repl2).iterator());
@@ -212,7 +214,7 @@ public class StatusCombinerTest {
@Test
public void duplicateStatuses() {
- Status newFile = StatusUtil.newFile(), update1 = StatusUtil.ingestedUntil(builder, 100), update2 = StatusUtil.ingestedUntil(builder, 200), repl1 = StatusUtil
+ Status newFile = StatusUtil.fileCreated(100), update1 = StatusUtil.ingestedUntil(builder, 100), update2 = StatusUtil.ingestedUntil(builder, 200), repl1 = StatusUtil
.replicated(builder, 50), repl2 = StatusUtil.replicated(builder, 150);
Status order1 = combiner.typedReduce(key, Arrays.asList(newFile, update1, repl1, update2, repl2).iterator());
@@ -225,7 +227,7 @@ public class StatusCombinerTest {
@Test
public void fileClosedTimePropagated() {
- Status stat1 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).setClosedTime(50).build();
+ Status stat1 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).setCreatedTime(50).build();
Status stat2 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).build();
Status combined = combiner.typedReduce(key, Arrays.asList(stat1, stat2).iterator());
@@ -235,14 +237,14 @@ public class StatusCombinerTest {
@Test
public void fileClosedTimeChoosesEarliestIgnoringDefault() {
- Status stat1 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).setClosedTime(50).build();
- Status stat2 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).setClosedTime(100).build();
+ Status stat1 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).setCreatedTime(50).build();
+ Status stat2 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).setCreatedTime(100).build();
Status combined = combiner.typedReduce(key, Arrays.asList(stat1, stat2).iterator());
Assert.assertEquals(stat1, combined);
- Status stat3 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).setClosedTime(100).build();
+ Status stat3 = Status.newBuilder().setBegin(10).setEnd(20).setClosed(true).setInfiniteEnd(false).setCreatedTime(100).build();
Status combined2 = combiner.typedReduce(key, Arrays.asList(combined, stat3).iterator());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
index 7aa53b6..3e3332f 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/ReplicationTableUtilTest.java
@@ -86,7 +86,8 @@ public class ReplicationTableUtilTest {
UUID uuid = UUID.randomUUID();
String myFile = "file:////home/user/accumulo/wal/server+port/" + uuid;
- ReplicationTableUtil.updateFiles(creds, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.newFile());
+ long createdTime = System.currentTimeMillis();
+ ReplicationTableUtil.updateFiles(creds, new KeyExtent(new Text("1"), null, null), Collections.singleton(myFile), StatusUtil.fileCreated(createdTime));
verify(writer);
@@ -101,13 +102,13 @@ public class ReplicationTableUtilTest {
Assert.assertEquals(MetadataSchema.ReplicationSection.COLF, new Text(update.getColumnFamily()));
Assert.assertEquals("1", new Text(update.getColumnQualifier()).toString());
- Assert.assertEquals(StatusUtil.newFileValue(), new Value(update.getValue()));
+ Assert.assertEquals(StatusUtil.fileCreatedValue(createdTime), new Value(update.getValue()));
}
@Test
public void replEntryMutation() {
// We stopped using a WAL -- we need a reference that this WAL needs to be replicated completely
- Status stat = StatusUtil.fileClosed(System.currentTimeMillis());
+ Status stat = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setCreatedTime(System.currentTimeMillis()).build();
String file = "file:///accumulo/wal/127.0.0.1+9997" + UUID.randomUUID();
Path filePath = new Path(file);
Text row = new Text(filePath.toString());
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 294883e..d649c3e 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -250,7 +250,7 @@ public class CloseWriteAheadLogReferences implements Runnable {
protected void closeWal(BatchWriter bw, Key k) throws MutationsRejectedException {
log.debug("Closing unreferenced WAL ({}) in metadata table", k.toStringNoTruncate());
Mutation m = new Mutation(k.getRow());
- m.put(k.getColumnFamily(), k.getColumnQualifier(), StatusUtil.fileClosedValue(System.currentTimeMillis()));
+ m.put(k.getColumnFamily(), k.getColumnQualifier(), StatusUtil.fileClosedValue());
bw.addMutation(m);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 2faa8a2..ab877fc 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -324,7 +324,7 @@ public class GarbageCollectWriteAheadLogsTest {
// Write a Status record which should prevent file1 from being deleted
LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.newFileValue()));
+ replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
ReplicationGCWAL replGC = new ReplicationGCWAL(instance, volMgr, false, replData);
@@ -338,7 +338,7 @@ public class GarbageCollectWriteAheadLogsTest {
assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
// The file is closed but not replicated, must be retained
- replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue(System.currentTimeMillis())));
+ replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
// File is closed and fully replicated, can be deleted
@@ -360,12 +360,14 @@ public class GarbageCollectWriteAheadLogsTest {
ReplicationTable.create(conn);
+ long file1CreateTime = System.currentTimeMillis();
+ long file2CreateTime = file1CreateTime + 50;
BatchWriter bw = conn.createBatchWriter(ReplicationTable.NAME, new BatchWriterConfig());
Mutation m = new Mutation("/wals/" + file1);
- StatusSection.add(m, new Text("1"), StatusUtil.newFileValue());
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
bw.addMutation(m);
m = new Mutation("/wals/" + file2);
- StatusSection.add(m, new Text("1"), StatusUtil.newFileValue());
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
bw.addMutation(m);
// These WALs are potential candidates for deletion from fs
@@ -402,14 +404,16 @@ public class GarbageCollectWriteAheadLogsTest {
ReplicationTable.create(conn);
+ long file1CreateTime = System.currentTimeMillis();
+ long file2CreateTime = file1CreateTime + 50;
// Write some records to the metadata table, we haven't yet written status records to the replication table
BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
bw.addMutation(m);
m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
bw.addMutation(m);
// These WALs are potential candidates for deletion from fs
@@ -442,7 +446,7 @@ public class GarbageCollectWriteAheadLogsTest {
String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
bw.addMutation(m);
bw.close();
@@ -460,16 +464,17 @@ public class GarbageCollectWriteAheadLogsTest {
Connector conn = inst.getConnector("root", new PasswordToken(""));
ReplicationTable.create(conn);
+ long walCreateTime = System.currentTimeMillis();
String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
bw.addMutation(m);
bw.close();
bw = ReplicationTable.getBatchWriter(conn);
m = new Mutation(wal);
- StatusSection.add(m, new Text("1"), StatusUtil.newFileValue());
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
bw.addMutation(m);
bw.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
index 4b6b4fc..51b9596 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectionTest.java
@@ -605,7 +605,7 @@ public class GarbageCollectionTest {
gce.candidates.add("hdfs://foo.com:6000/accumulo/tables/2/t-00002/A000002.rf");
// We replicated all of the data, but we might still write more data to the file
- Status status = StatusUtil.newFile();
+ Status status = StatusUtil.fileCreated(System.currentTimeMillis());
gce.filesToReplicate.put("hdfs://foo.com:6000/accumulo/tables/1/t-00001/A000001.rf", status);
gca.collect(gce);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 67dcc31..66152c4 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -312,7 +312,7 @@ public class CloseWriteAheadLogReferencesTest {
ReplicationTable.create(conn);
BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "file:/accumulo/wal/tserver+port/12345");
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
bw.addMutation(m);
bw.close();
@@ -334,7 +334,7 @@ public class CloseWriteAheadLogReferencesTest {
ReplicationTable.create(conn);
BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
- m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.newFileValue());
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
bw.addMutation(m);
bw.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
index dd5ccd8..f7246d7 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecords.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.master.replication;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -147,7 +146,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
}
Mutation m = new Mutation(row);
- Map<String,Long> tableToTimeClosed = new HashMap<>();
+ Map<String,Long> tableToTimeCreated = new HashMap<>();
for (Entry<Key,Value> entry : columns.entrySet()) {
Status status = null;
try {
@@ -178,12 +177,12 @@ public class RemoveCompleteReplicationRecords implements Runnable {
throw new RuntimeException("Got unexpected column");
}
- if (status.hasClosedTime()) {
- Long timeClosed = tableToTimeClosed.get(tableId);
+ if (status.hasCreatedTime()) {
+ Long timeClosed = tableToTimeCreated.get(tableId);
if (null == timeClosed) {
- tableToTimeClosed.put(tableId, status.getClosedTime());
- } else if (timeClosed != status.getClosedTime()){
- log.warn("Found multiple values for timeClosed for {}: {} and {}", row, timeClosed, status.getClosedTime());
+ tableToTimeCreated.put(tableId, status.getCreatedTime());
+ } else if (timeClosed != status.getCreatedTime()){
+ log.warn("Found multiple values for timeClosed for {}: {} and {}", row, timeClosed, status.getCreatedTime());
}
}
@@ -194,7 +193,7 @@ public class RemoveCompleteReplicationRecords implements Runnable {
List<Mutation> mutations = new ArrayList<>();
mutations.add(m);
- for (Entry<String,Long> entry : tableToTimeClosed.entrySet()) {
+ for (Entry<String,Long> entry : tableToTimeCreated.entrySet()) {
log.info("Removing order mutation for table {} at {} for {}", entry.getKey(), entry.getValue(), row.toString());
Mutation orderMutation = OrderSection.createMutation(row.toString(), entry.getValue());
orderMutation.putDelete(OrderSection.NAME, new Text(entry.getKey()));
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
index 6bc5962..c054de2 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/StatusMaker.java
@@ -193,13 +193,14 @@ public class StatusMaker {
*/
protected boolean addOrderRecord(Text file, Text tableId, Status stat, Value value) {
try {
- if (!stat.hasClosedTime()) {
- log.warn("Status record ({}) for {} in table {} was written to metadata table which was closed but lacked closedTime", ProtobufUtil.toString(stat), file, tableId);
+ if (!stat.hasCreatedTime()) {
+ log.error("Status record ({}) for {} in table {} was written to metadata table which lacked createdTime", ProtobufUtil.toString(stat), file, tableId);
+ return false;
}
log.info("Creating order record for {} for {} with {}", file, tableId, ProtobufUtil.toString(stat));
- Mutation m = OrderSection.createMutation(file.toString(), stat.getClosedTime());
+ Mutation m = OrderSection.createMutation(file.toString(), stat.getCreatedTime());
OrderSection.add(m, tableId, value);
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
index 46c5691..ddd4810 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssignerTest.java
@@ -211,11 +211,11 @@ public class DistributedWorkQueueWorkAssignerTest {
String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
Mutation m = new Mutation(file1);
- WorkSection.add(m, serializedTarget1, StatusUtil.newFileValue());
+ WorkSection.add(m, serializedTarget1, StatusUtil.fileCreatedValue(5));
bw.addMutation(m);
m = new Mutation(file2);
- WorkSection.add(m, serializedTarget2, StatusUtil.newFileValue());
+ WorkSection.add(m, serializedTarget2, StatusUtil.fileCreatedValue(10));
bw.addMutation(m);
bw.close();
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
index 373062c..1cd30f8 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/RemoveCompleteReplicationRecordsTest.java
@@ -203,7 +203,7 @@ public class RemoveCompleteReplicationRecordsTest {
long time = System.currentTimeMillis();
// Write out numRecords entries to both replication and metadata tables, none of which are fully replicated
for (int i = 0; i < numRecords; i++) {
- builder.setClosedTime(time++);
+ builder.setCreatedTime(time++);
String file = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
Mutation m = new Mutation(file);
Value v = ProtobufUtil.toValue(builder.setBegin(1000*(i+1)).build());
@@ -224,7 +224,7 @@ public class RemoveCompleteReplicationRecordsTest {
filesToRemove.add(fileToRemove);
Mutation m = new Mutation(fileToRemove);
ReplicationTarget target = new ReplicationTarget("peer1", "5", "5");
- Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setClosedTime(time).build());
+ Value value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
StatusSection.add(m, new Text("5"), value);
WorkSection.add(m, target.toText(), value);
replBw.addMutation(m);
@@ -239,7 +239,7 @@ public class RemoveCompleteReplicationRecordsTest {
fileToRemove = "/accumulo/wal/tserver+port/" + UUID.randomUUID();
filesToRemove.add(fileToRemove);
m = new Mutation(fileToRemove);
- value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setClosedTime(time).build());
+ value = ProtobufUtil.toValue(builder.setBegin(10000).setEnd(10000).setClosed(true).setCreatedTime(time).build());
target = new ReplicationTarget("peer1", "6", "6");
StatusSection.add(m, new Text("6"), value);
WorkSection.add(m, target.toText(), value);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
index 0820e1c..dce0aa6 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/SequentialWorkAssignerTest.java
@@ -95,8 +95,8 @@ public class SequentialWorkAssignerTest {
String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
// File1 was closed before file2, however
- Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
- Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+ Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+ Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
Mutation m = new Mutation(file1);
WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
@@ -106,11 +106,11 @@ public class SequentialWorkAssignerTest {
WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
bw.addMutation(m);
- m = OrderSection.createMutation(file1, stat1.getClosedTime());
+ m = OrderSection.createMutation(file1, stat1.getCreatedTime());
OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
bw.addMutation(m);
- m = OrderSection.createMutation(file2, stat2.getClosedTime());
+ m = OrderSection.createMutation(file2, stat2.getCreatedTime());
OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
bw.addMutation(m);
@@ -168,8 +168,8 @@ public class SequentialWorkAssignerTest {
String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
// File1 was closed before file2, however
- Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
- Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+ Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+ Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
Mutation m = new Mutation(file1);
WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
@@ -179,11 +179,11 @@ public class SequentialWorkAssignerTest {
WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
bw.addMutation(m);
- m = OrderSection.createMutation(file1, stat1.getClosedTime());
+ m = OrderSection.createMutation(file1, stat1.getCreatedTime());
OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
bw.addMutation(m);
- m = OrderSection.createMutation(file2, stat2.getClosedTime());
+ m = OrderSection.createMutation(file2, stat2.getCreatedTime());
OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
bw.addMutation(m);
@@ -248,8 +248,8 @@ public class SequentialWorkAssignerTest {
String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
// File1 was closed before file2, however
- Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
- Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+ Status stat1 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+ Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
Mutation m = new Mutation(file1);
WorkSection.add(m, serializedTarget1, ProtobufUtil.toValue(stat1));
@@ -259,11 +259,11 @@ public class SequentialWorkAssignerTest {
WorkSection.add(m, serializedTarget2, ProtobufUtil.toValue(stat2));
bw.addMutation(m);
- m = OrderSection.createMutation(file1, stat1.getClosedTime());
+ m = OrderSection.createMutation(file1, stat1.getCreatedTime());
OrderSection.add(m, new Text(target1.getSourceTableId()), ProtobufUtil.toValue(stat1));
bw.addMutation(m);
- m = OrderSection.createMutation(file2, stat2.getClosedTime());
+ m = OrderSection.createMutation(file2, stat2.getCreatedTime());
OrderSection.add(m, new Text(target2.getSourceTableId()), ProtobufUtil.toValue(stat2));
bw.addMutation(m);
@@ -369,8 +369,8 @@ public class SequentialWorkAssignerTest {
String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
// File1 was closed before file2, however
- Status stat1 = Status.newBuilder().setBegin(100).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(250).build();
- Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setClosedTime(500).build();
+ Status stat1 = Status.newBuilder().setBegin(100).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(250).build();
+ Status stat2 = Status.newBuilder().setBegin(0).setEnd(100).setClosed(true).setInfiniteEnd(false).setCreatedTime(500).build();
Mutation m = new Mutation(file1);
WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat1));
@@ -380,11 +380,11 @@ public class SequentialWorkAssignerTest {
WorkSection.add(m, serializedTarget, ProtobufUtil.toValue(stat2));
bw.addMutation(m);
- m = OrderSection.createMutation(file1, stat1.getClosedTime());
+ m = OrderSection.createMutation(file1, stat1.getCreatedTime());
OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat1));
bw.addMutation(m);
- m = OrderSection.createMutation(file2, stat2.getClosedTime());
+ m = OrderSection.createMutation(file2, stat2.getCreatedTime());
OrderSection.add(m, new Text(target.getSourceTableId()), ProtobufUtil.toValue(stat2));
bw.addMutation(m);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
index e0fc421..4c0ab2b 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/StatusMakerTest.java
@@ -78,12 +78,16 @@ public class StatusMakerTest {
Map<String,Integer> fileToTableId = new HashMap<>();
int index = 1;
+ long timeCreated = 0;
+ Map<String,Long> fileToTimeCreated = new HashMap<>();
for (String file : files) {
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
- m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.newFileValue());
+ m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
+ fileToTimeCreated.put(file, timeCreated);
bw.addMutation(m);
fileToTableId.put(file, index);
index++;
+ timeCreated++;
}
bw.close();
@@ -102,7 +106,9 @@ public class StatusMakerTest {
Assert.assertTrue("Found unexpected file: " + file, files.contains(file.toString()));
Assert.assertEquals(fileToTableId.get(file.toString()), new Integer(tableId.toString()));
- Assert.assertEquals(StatusUtil.newFile(), Status.parseFrom(entry.getValue().get()));
+ timeCreated = fileToTimeCreated.get(file.toString());
+ Assert.assertNotNull(timeCreated);
+ Assert.assertEquals(StatusUtil.fileCreated(timeCreated), Status.parseFrom(entry.getValue().get()));
}
}
@@ -123,12 +129,14 @@ public class StatusMakerTest {
Map<String,Integer> fileToTableId = new HashMap<>();
int index = 1;
+ long timeCreated = 0;
for (String file : files) {
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
- m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.newFileValue());
+ m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), StatusUtil.fileCreatedValue(timeCreated));
bw.addMutation(m);
fileToTableId.put(file, index);
index++;
+ timeCreated++;
}
bw.close();
@@ -160,7 +168,7 @@ public class StatusMakerTest {
walPrefix + UUID.randomUUID());
Map<String,Integer> fileToTableId = new HashMap<>();
- Status stat = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+ Status stat = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).setCreatedTime(System.currentTimeMillis()).build();
int index = 1;
for (String file : files) {
@@ -212,7 +220,7 @@ public class StatusMakerTest {
int index = 1;
long time = System.currentTimeMillis();
for (String file : files) {
- statBuilder.setClosedTime(time++);
+ statBuilder.setCreatedTime(time++);
Mutation m = new Mutation(ReplicationSection.getRowPrefix() + file);
m.put(ReplicationSection.COLF, new Text(Integer.toString(index)), ProtobufUtil.toValue(statBuilder.build()));
bw.addMutation(m);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
index 337aa12..0c4afc9 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
@@ -84,9 +84,9 @@ public class WorkMakerTest {
String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
// Create a status record for a file
- long timeClosed = System.currentTimeMillis();
+ long timeCreated = System.currentTimeMillis();
Mutation m = new Mutation(new Path(file).toString());
- m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue(timeClosed));
+ m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(timeCreated));
BatchWriter bw = ReplicationTable.getBatchWriter(conn);
bw.addMutation(m);
bw.flush();
@@ -101,7 +101,7 @@ public class WorkMakerTest {
// Invoke the addWorkRecord method to create a Work record from the Status record earlier
ReplicationTarget expected = new ReplicationTarget("remote_cluster_1", "4", tableId);
workMaker.setBatchWriter(bw);
- workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(timeClosed), ImmutableMap.of("remote_cluster_1", "4"), tableId);
+ workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(timeCreated), ImmutableMap.of("remote_cluster_1", "4"), tableId);
// Scan over just the WorkSection
s = ReplicationTable.getScanner(conn);
@@ -114,7 +114,7 @@ public class WorkMakerTest {
Assert.assertEquals(file, workKey.getRow().toString());
Assert.assertEquals(WorkSection.NAME, workKey.getColumnFamily());
Assert.assertEquals(expected, actual);
- Assert.assertEquals(workEntry.getValue(), StatusUtil.fileClosedValue(timeClosed));
+ Assert.assertEquals(workEntry.getValue(), StatusUtil.fileCreatedValue(timeCreated));
}
@Test
@@ -127,7 +127,7 @@ public class WorkMakerTest {
String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
Mutation m = new Mutation(new Path(file).toString());
- m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue(System.currentTimeMillis()));
+ m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
BatchWriter bw = ReplicationTable.getBatchWriter(conn);
bw.addMutation(m);
bw.flush();
@@ -145,7 +145,7 @@ public class WorkMakerTest {
expectedTargets.add(new ReplicationTarget(cluster.getKey(), cluster.getValue(), tableId));
}
workMaker.setBatchWriter(bw);
- workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(System.currentTimeMillis()), targetClusters, tableId);
+ workMaker.addWorkRecord(new Text(file), StatusUtil.fileCreatedValue(System.currentTimeMillis()), targetClusters, tableId);
s = ReplicationTable.getScanner(conn);
WorkSection.limit(s);
@@ -175,7 +175,7 @@ public class WorkMakerTest {
String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
Mutation m = new Mutation(new Path(file).toString());
- m.put(StatusSection.NAME, new Text(tableId), StatusUtil.newFileValue());
+ m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
BatchWriter bw = ReplicationTable.getBatchWriter(conn);
bw.addMutation(m);
bw.flush();
@@ -205,7 +205,7 @@ public class WorkMakerTest {
public void closedStatusRecordsStillMakeWork() throws Exception {
WorkMaker workMaker = new WorkMaker(conn);
- Assert.assertFalse(workMaker.shouldCreateWork(StatusUtil.newFile()));
+ Assert.assertFalse(workMaker.shouldCreateWork(StatusUtil.fileCreated(System.currentTimeMillis())));
Assert.assertTrue(workMaker.shouldCreateWork(StatusUtil.ingestedUntil(1000)));
Assert.assertTrue(workMaker.shouldCreateWork(Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build()));
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
index 418f679..73e99e2 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java
@@ -1408,7 +1408,7 @@ public class Tablet {
// Ensure that we write a record marking each WAL as requiring replication to make sure we don't abandon the data
if (ReplicationConfigurationUtil.isEnabled(extent, tabletServer.getTableConfiguration(extent))) {
- Status status = StatusUtil.fileClosed(System.currentTimeMillis());
+ Status status = StatusUtil.fileClosed();
for (LogEntry logEntry : logEntries) {
log.debug("Writing closed status to metadata table for " + logEntry.logSet + " " + ProtobufUtil.toString(status));
ReplicationTableUtil.updateFiles(SystemCredentials.get(), extent, logEntry.logSet, status);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index d8c4279..b7b0aff 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
@@ -274,9 +275,10 @@ public class TabletServerLogger {
for (DfsLogger logger : copy) {
logs.add(logger.getFileName());
}
- log.debug("Writing " + ProtobufUtil.toString(StatusUtil.newFile()) + " to replication table for " + logs);
+ Status status = StatusUtil.fileCreated(System.currentTimeMillis());
+ log.debug("Writing " + ProtobufUtil.toString(status) + " to replication table for " + logs);
// Got some new WALs, note this in the replication table
- ReplicationTableUtil.updateFiles(SystemCredentials.get(), commitSession.getExtent(), logs, StatusUtil.newFile());
+ ReplicationTableUtil.updateFiles(SystemCredentials.get(), commitSession.getExtent(), logs, status);
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 2a7a210..9047533 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -227,7 +227,8 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
// Fake that each one is fully replicated
Mutation m = new Mutation(entry.getKey().getRow());
- m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(), StatusUtil.newFileValue());
+ m.put(entry.getKey().getColumnFamily().toString(), entry.getKey().getColumnQualifier().toString(),
+ StatusUtil.fileCreatedValue(System.currentTimeMillis()));
bw.addMutation(m);
}
bw.close();
@@ -287,7 +288,7 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
}
Assert.fail("Expected all replication records in the metadata table to be closed");
}
-
+
for (int i = 0; i < 10; i++) {
allClosed = true;
@@ -328,7 +329,6 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
gc.waitFor();
}
-
}
@Test(timeout = 5 * 60 * 1000)
@@ -345,197 +345,210 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
// replication shouldn't exist when we begin
Assert.assertFalse(conn.tableOperations().exists(ReplicationTable.NAME));
- // Create two tables
- conn.tableOperations().create(table1);
+ ReplicationTablesPrinterThread thread = new ReplicationTablesPrinterThread(conn, System.out);
+ thread.start();
- int attempts = 5;
- while (attempts > 0) {
- try {
- // Enable replication on table1
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
- // Replicate table1 to cluster1 in the table with id of '4'
- conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
- // Use the MockReplicaSystem impl and sleep for 5seconds
- conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
- ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
- attempts = 0;
- } catch (Exception e) {
- attempts--;
- if (attempts <= 0) {
- throw e;
+ try {
+ // Create two tables
+ conn.tableOperations().create(table1);
+
+ int attempts = 5;
+ while (attempts > 0) {
+ try {
+ // Enable replication on table1
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
+ // Replicate table1 to cluster1 in the table with id of '4'
+ conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+ // Use the MockReplicaSystem impl and sleep for 5seconds
+ conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+ ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "5000"));
+ attempts = 0;
+ } catch (Exception e) {
+ attempts--;
+ if (attempts <= 0) {
+ throw e;
+ }
+ UtilWaitThread.sleep(500);
}
- UtilWaitThread.sleep(500);
}
- }
- String tableId = conn.tableOperations().tableIdMap().get(table1);
- Assert.assertNotNull("Could not determine table id for " + table1, tableId);
+ String tableId = conn.tableOperations().tableIdMap().get(table1);
+ Assert.assertNotNull("Could not determine table id for " + table1, tableId);
- // Write some data to table1
- BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
- for (int rows = 0; rows < 2000; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 50; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
+ // Write some data to table1
+ BatchWriter bw = conn.createBatchWriter(table1, new BatchWriterConfig());
+ for (int rows = 0; rows < 2000; rows++) {
+ Mutation m = new Mutation(Integer.toString(rows));
+ for (int cols = 0; cols < 50; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
}
- bw.addMutation(m);
- }
- bw.close();
+ bw.close();
- // Make sure the replication table exists at this point
- boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
- attempts = 10;
- do {
- if (!exists) {
- UtilWaitThread.sleep(1000);
- exists = conn.tableOperations().exists(ReplicationTable.NAME);
- attempts--;
- }
- } while (!exists && attempts > 0);
- Assert.assertTrue("Replication table did not exist", exists);
+ // Make sure the replication table exists at this point
+ boolean exists = conn.tableOperations().exists(ReplicationTable.NAME);
+ attempts = 10;
+ do {
+ if (!exists) {
+ UtilWaitThread.sleep(1000);
+ exists = conn.tableOperations().exists(ReplicationTable.NAME);
+ attempts--;
+ }
+ } while (!exists && attempts > 0);
+ Assert.assertTrue("Replication table did not exist", exists);
+
+ // Grant ourselves the write permission for later
+ conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+ // Find the WorkSection record that will be created for that data we ingested
+ boolean notFound = true;
+ Scanner s;
+ for (int i = 0; i < 10 && notFound; i++) {
+ try {
+ s = ReplicationTable.getScanner(conn);
+ WorkSection.limit(s);
+ Entry<Key,Value> e = Iterables.getOnlyElement(s);
+ Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
+ Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
+ notFound = false;
+ } catch (NoSuchElementException e) {
+
+ } catch (IllegalArgumentException e) {
+ // Somehow we got more than one element. Log what they were
+ s = ReplicationTable.getScanner(conn);
+ for (Entry<Key,Value> content : s) {
+ log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ }
+ Assert.fail("Found more than one work section entry");
+ } catch (RuntimeException e) {
+ // Catch a propagation issue, fail if it's not what we expect
+ Throwable cause = e.getCause();
+ if (cause instanceof AccumuloSecurityException) {
+ AccumuloSecurityException sec = (AccumuloSecurityException) cause;
+ switch (sec.getSecurityErrorCode()) {
+ case PERMISSION_DENIED:
+ // retry -- the grant didn't happen yet
+ log.warn("Sleeping because permission was denied");
+ default:
+ throw e;
+ }
+ } else {
+ throw e;
+ }
+ }
- // Grant ourselves the write permission for later
- conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+ Thread.sleep(1000);
+ }
- // Find the WorkSection record that will be created for that data we ingested
- boolean notFound = true;
- Scanner s;
- for (int i = 0; i < 10 && notFound; i++) {
- try {
- s = ReplicationTable.getScanner(conn);
- WorkSection.limit(s);
- Entry<Key,Value> e = Iterables.getOnlyElement(s);
- Text expectedColqual = new ReplicationTarget("cluster1", "4", tableId).toText();
- Assert.assertEquals(expectedColqual, e.getKey().getColumnQualifier());
- notFound = false;
- } catch (NoSuchElementException e) {
-
- } catch (IllegalArgumentException e) {
- // Somehow we got more than one element. Log what they were
+ if (notFound) {
s = ReplicationTable.getScanner(conn);
for (Entry<Key,Value> content : s) {
log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
}
- Assert.fail("Found more than one work section entry");
- } catch (RuntimeException e) {
- // Catch a propagation issue, fail if it's not what we expect
- Throwable cause = e.getCause();
- if (cause instanceof AccumuloSecurityException) {
- AccumuloSecurityException sec = (AccumuloSecurityException) cause;
- switch (sec.getSecurityErrorCode()) {
- case PERMISSION_DENIED:
- // retry -- the grant didn't happen yet
- log.warn("Sleeping because permission was denied");
- default:
- throw e;
- }
- } else {
- throw e;
- }
+ Assert.assertFalse("Did not find the work entry for the status entry", notFound);
}
- Thread.sleep(1000);
- }
-
- if (notFound) {
- s = ReplicationTable.getScanner(conn);
- for (Entry<Key,Value> content : s) {
- log.info(content.getKey().toStringNoTruncate() + " => " + content.getValue());
+ /**
+ * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
+ * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
+ * anymore
+ */
+
+ log.info("Killing tserver");
+ // Kill the tserver(s) and restart them
+ // to ensure that the WALs we previously observed all move to closed.
+ for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
+ cluster.killProcess(ServerType.TABLET_SERVER, proc);
}
- Assert.assertFalse("Did not find the work entry for the status entry", notFound);
- }
-
- /**
- * By this point, we should have data ingested into a table, with at least one WAL as a candidate for replication. Compacting the table should close all
- * open WALs, which should ensure all records we're going to replicate have entries in the replication table, and nothing will exist in the metadata table
- * anymore
- */
-
- log.info("Killing tserver");
- // Kill the tserver(s) and restart them
- // to ensure that the WALs we previously observed all move to closed.
- for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
- cluster.killProcess(ServerType.TABLET_SERVER, proc);
- }
-
- log.info("Starting tserver");
- cluster.exec(TabletServer.class);
- log.info("Waiting to read tables");
+ log.info("Starting tserver");
+ cluster.exec(TabletServer.class);
- // Make sure we can read all the tables (recovery complete)
- for (String table : new String[] {MetadataTable.NAME, table1}) {
- s = conn.createScanner(table, new Authorizations());
- for (@SuppressWarnings("unused")
- Entry<Key,Value> entry : s) {}
- }
+ log.info("Waiting to read tables");
- log.info("Checking for replication entries in replication");
- // Then we need to get those records over to the replication table
- boolean foundResults = false;
- for (int i = 0; i < 5; i++) {
- s = ReplicationTable.getScanner(conn);
- int count = 0;
- for (Entry<Key,Value> entry : s) {
- count++;
- log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ // Make sure we can read all the tables (recovery complete)
+ for (String table : new String[] {MetadataTable.NAME, table1}) {
+ s = conn.createScanner(table, new Authorizations());
+ for (@SuppressWarnings("unused")
+ Entry<Key,Value> entry : s) {}
}
- if (count > 0) {
- foundResults = true;
- break;
- }
- Thread.sleep(1000);
- }
- Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
-
- getCluster().exec(SimpleGarbageCollector.class);
-
- // Wait for a bit since the GC has to run (should be running after a one second delay)
- Thread.sleep(5000);
-
- // We expect no records in the metadata table after compaction. We have to poll
- // because we have to wait for the StatusMaker's next iteration which will clean
- // up the dangling *closed* records after we create the record in the replication table.
- // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
- log.info("Checking metadata table for replication entries");
- foundResults = true;
- for (int i = 0; i < 5; i++) {
- s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
- s.setRange(ReplicationSection.getRange());
- if (Iterables.size(s) == 0) {
- foundResults = false;
- break;
+ log.info("Checking for replication entries in replication");
+ // Then we need to get those records over to the replication table
+ boolean foundResults = false;
+ for (int i = 0; i < 5; i++) {
+ s = ReplicationTable.getScanner(conn);
+ int count = 0;
+ for (Entry<Key,Value> entry : s) {
+ count++;
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ }
+ if (count > 0) {
+ foundResults = true;
+ break;
+ }
+ Thread.sleep(1000);
}
- Thread.sleep(1000);
- }
- Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
+ Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
- /**
- * After we close out and subsequently delete the metadata record, this will propagate to the replication table,
- * which will cause those records to be deleted after repliation occurs
- */
+ getCluster().exec(SimpleGarbageCollector.class);
- int recordsFound = 0;
- for (int i = 0; i < 10; i++) {
- s = ReplicationTable.getScanner(conn);
- recordsFound = 0;
- for (Entry<Key,Value> entry : s) {
- recordsFound++;
- log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
- }
+ // Wait for a bit since the GC has to run (should be running after a one second delay)
+ Thread.sleep(5000);
- if (0 == recordsFound) {
- break;
- } else {
+ // We expect no records in the metadata table after compaction. We have to poll
+ // because we have to wait for the StatusMaker's next iteration which will clean
+ // up the dangling *closed* records after we create the record in the replication table.
+ // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
+ log.info("Checking metadata table for replication entries");
+ foundResults = true;
+ for (int i = 0; i < 5; i++) {
+ s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ s.setRange(ReplicationSection.getRange());
+ long size = 0;
+ for (Entry<Key,Value> e : s) {
+ size++;
+ log.info("{}={}", e.getKey().toStringNoTruncate(), ProtobufUtil.toString(Status.parseFrom(e.getValue().get())));
+ }
+ if (size == 0) {
+ foundResults = false;
+ break;
+ }
Thread.sleep(1000);
log.info("");
}
- }
- Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
+ Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
+
+ /**
+ * After we close out and subsequently delete the metadata record, this will propagate to the replication table, which will cause those records to be
+ * deleted after repliation occurs
+ */
+
+ int recordsFound = 0;
+ for (int i = 0; i < 10; i++) {
+ s = ReplicationTable.getScanner(conn);
+ recordsFound = 0;
+ for (Entry<Key,Value> entry : s) {
+ recordsFound++;
+ log.info(entry.getKey().toStringNoTruncate() + " " + Status.parseFrom(entry.getValue().get()).toString().replace("\n", ", "));
+ }
+
+ if (0 == recordsFound) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ log.info("");
+ }
+ }
+
+ Assert.assertEquals("Found unexpected replication records in the replication table", 0, recordsFound);
+ } finally {
+ thread.join(200);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/b53bbf08/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
index 03ac72c..70d6ca1 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithMakerTest.java
@@ -23,10 +23,12 @@ import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.replication.ReplicaSystemFactory;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
import org.apache.accumulo.core.replication.ReplicationTarget;
@@ -91,6 +93,8 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION.getKey(), "true");
// Replicate table1 to cluster1 in the table with id of '4'
conn.tableOperations().setProperty(table1, Property.TABLE_REPLICATION_TARGETS.getKey() + "cluster1", "4");
+ conn.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + "cluster1",
+ ReplicaSystemFactory.getPeerConfigurationValue(MockReplicaSystem.class, "100000"));
break;
} catch (Exception e) {
attempts--;
@@ -136,7 +140,7 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
// Trigger the minor compaction, waiting for it to finish.
// This should write the entry to metadata that the file has data
- conn.tableOperations().flush(ReplicationTable.NAME, null, null, true);
+ conn.tableOperations().flush(table1, null, null, true);
// Make sure that we have one status element, should be a new file
Scanner s = ReplicationTable.getScanner(conn);
@@ -148,7 +152,8 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
while (null == entry && attempts > 0) {
try {
entry = Iterables.getOnlyElement(s);
- if (!expectedStatus.equals(Status.parseFrom(entry.getValue().get()))) {
+ Status actual = Status.parseFrom(entry.getValue().get());
+ if (actual.getInfiniteEnd() != expectedStatus.getInfiniteEnd()) {
entry = null;
// the master process didn't yet fire and write the new mutation, wait for it to do
// so and try to read it again
@@ -171,7 +176,8 @@ public class ReplicationWithMakerTest extends ConfigurableMacIT {
}
Assert.assertNotNull("Could not find expected entry in replication table", entry);
- Assert.assertEquals("Expected to find a replication entry that is open with infinite length", expectedStatus, Status.parseFrom(entry.getValue().get()));
+ Status actual = Status.parseFrom(entry.getValue().get());
+ Assert.assertTrue("Expected to find a replication entry that is open with infinite length: " + ProtobufUtil.toString(actual), !actual.getClosed() && actual.getInfiniteEnd());
// Try a couple of times to watch for the work record to be created
boolean notFound = true;