You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2018/05/08 15:36:28 UTC
[accumulo] 03/04: fixes #449 code review updates (#458)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit a8fead21fb970fc0ad7d93153eb72e4a30a739de
Author: Keith Turner <kt...@apache.org>
AuthorDate: Wed May 2 18:56:26 2018 -0400
fixes #449 code review updates (#458)
---
.../LogEvents.java => log/CloseableIterator.java} | 14 +-
.../org/apache/accumulo/tserver/log/DfsLogger.java | 8 +-
.../accumulo/tserver/log/RecoveryLogReader.java | 115 ++++++++++---
.../accumulo/tserver/log/RecoveryLogsIterator.java | 84 ++-------
.../accumulo/tserver/log/SortedLogRecovery.java | 22 +--
.../apache/accumulo/tserver/logger/LogEvents.java | 3 -
.../apache/accumulo/tserver/logger/LogFileKey.java | 46 ++---
.../apache/accumulo/tserver/logger/LogReader.java | 14 +-
.../tserver/replication/AccumuloReplicaSystem.java | 6 +-
.../accumulo/tserver/log/LogEventsTest.java} | 25 ++-
.../accumulo/tserver/log/LogFileKeyTest.java | 96 +++++++++++
.../tserver/log/RecoveryLogsReaderTest.java | 33 ++++
.../tserver/log/SortedLogRecoveryTest.java | 189 ++++++++++++++++++++-
.../accumulo/tserver/logger/LogFileTest.java | 14 +-
.../replication/AccumuloReplicaSystemTest.java | 32 ++--
.../BatchWriterReplicationReplayerTest.java | 4 +-
.../UnusedWalDoesntCloseReplicationStatusIT.java | 2 +-
17 files changed, 517 insertions(+), 190 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/CloseableIterator.java
similarity index 66%
copy from server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java
copy to server/tserver/src/main/java/org/apache/accumulo/tserver/log/CloseableIterator.java
index 044629e..59623dc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/CloseableIterator.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.accumulo.tserver.logger;
+package org.apache.accumulo.tserver.log;
-public enum LogEvents {
- // TODO add unit test to verify ordinals, rather than rely on dubious comments
- // TODO if possible, rename COMPACTION to "FLUSH" (or at least "MINC") without changing
- // serialization
- // DO NOT CHANGE ORDER OF ENUMS, ORDER IS USED IN SERIALIZATION
- OPEN, DEFINE_TABLET, MUTATION, MANY_MUTATIONS, COMPACTION_START, COMPACTION_FINISH;
+import java.io.IOException;
+import java.util.Iterator;
+
+public interface CloseableIterator<T> extends Iterator<T>, AutoCloseable {
+ @Override
+ public void close() throws IOException;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
index 6aa4964..263f0c8 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/DfsLogger.java
@@ -601,7 +601,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
final LogFileKey key = new LogFileKey();
key.event = DEFINE_TABLET;
key.seq = seq;
- key.tid = tid;
+ key.tabletId = tid;
key.tablet = tablet;
try {
write(key, EMPTY);
@@ -662,7 +662,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
LogFileKey key = new LogFileKey();
key.event = MANY_MUTATIONS;
key.seq = tabletMutations.getSeq();
- key.tid = tabletMutations.getTid();
+ key.tabletId = tabletMutations.getTid();
LogFileValue value = new LogFileValue();
value.mutations = tabletMutations.getMutations();
data.add(new Pair<>(key, value));
@@ -688,7 +688,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
LogFileKey key = new LogFileKey();
key.event = COMPACTION_FINISH;
key.seq = seq;
- key.tid = tid;
+ key.tabletId = tid;
return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
}
@@ -697,7 +697,7 @@ public class DfsLogger implements Comparable<DfsLogger> {
LogFileKey key = new LogFileKey();
key.event = COMPACTION_START;
key.seq = seq;
- key.tid = tid;
+ key.tabletId = tid;
key.filename = fqfn;
return logFileData(Collections.singletonList(new Pair<>(key, EMPTY)), durability);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
index ab8b631..718fa98 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogEvents;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.commons.collections.buffer.PriorityBuffer;
@@ -37,7 +38,10 @@ import org.apache.hadoop.io.MapFile.Reader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
/**
* A class which reads sorted recovery logs produced from a single WAL.
@@ -46,7 +50,7 @@ import com.google.common.base.Preconditions;
* directory. The primary purpose of this class is to merge the results of multiple Reduce jobs that
* result in Map output files.
*/
-public class RecoveryLogReader {
+public class RecoveryLogReader implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
/**
* Group together the next key/value from a Reader with the Reader
@@ -107,8 +111,14 @@ public class RecoveryLogReader {
}
private PriorityBuffer heap = new PriorityBuffer();
+ private Iterator<Entry<LogFileKey,LogFileValue>> iter;
public RecoveryLogReader(VolumeManager fs, Path directory) throws IOException {
+ this(fs, directory, null, null);
+ }
+
+ public RecoveryLogReader(VolumeManager fs, Path directory, LogFileKey start, LogFileKey end)
+ throws IOException {
boolean foundFinish = false;
for (FileStatus child : fs.listStatus(directory)) {
if (child.getPath().getName().startsWith("_"))
@@ -123,6 +133,8 @@ public class RecoveryLogReader {
if (!foundFinish)
throw new IOException(
"Sort \"" + SortedLogState.FINISHED.getMarker() + "\" flag not found in " + directory);
+
+ iter = new SortCheckIterator(new RangeIterator(start, end));
}
private static void copy(Writable src, Writable dest) throws IOException {
@@ -134,7 +146,8 @@ public class RecoveryLogReader {
dest.readFields(input);
}
- public synchronized boolean next(WritableComparable<?> key, Writable val) throws IOException {
+ @VisibleForTesting
+ synchronized boolean next(WritableComparable<?> key, Writable val) throws IOException {
Index elt = (Index) heap.remove();
try {
elt.cache();
@@ -151,6 +164,7 @@ public class RecoveryLogReader {
return true;
}
+ @VisibleForTesting
synchronized boolean seek(WritableComparable<?> key) throws IOException {
PriorityBuffer reheap = new PriorityBuffer(heap.size());
boolean result = false;
@@ -171,7 +185,8 @@ public class RecoveryLogReader {
return result;
}
- void close() throws IOException {
+ @Override
+ public void close() throws IOException {
IOException problem = null;
for (Object obj : heap) {
Index index = (Index) obj;
@@ -186,39 +201,75 @@ public class RecoveryLogReader {
heap = null;
}
- volatile boolean returnedIterator = false;
+ /**
+ * Ensures source iterator provides data in sorted order
+ */
+ @VisibleForTesting
+ static class SortCheckIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {
- // TODO make this primary entry into this class, and remove volatile boolean and make rest private
- Iterator<Entry<LogFileKey,LogFileValue>> getIterator(LogFileKey start, LogFileKey end)
- throws IOException {
- Preconditions.checkState(!returnedIterator, "Each reader can have only one iterator");
- returnedIterator = true;
- return new RecoveryLogReaderIterator(this, start, end);
+ private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
+
+ SortCheckIterator(Iterator<Entry<LogFileKey,LogFileValue>> source) {
+ this.source = Iterators.peekingIterator(source);
+
+ }
+
+ @Override
+ public boolean hasNext() {
+ return source.hasNext();
+ }
+
+ @Override
+ public Entry<LogFileKey,LogFileValue> next() {
+ Entry<LogFileKey,LogFileValue> next = source.next();
+ if (source.hasNext()) {
+ Preconditions.checkState(next.getKey().compareTo(source.peek().getKey()) <= 0,
+ "Keys not in order %s %s", next.getKey(), source.peek().getKey());
+ }
+ return next;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
}
- private static class RecoveryLogReaderIterator
- implements Iterator<Entry<LogFileKey,LogFileValue>> {
+ private class RangeIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {
- private RecoveryLogReader reader;
private LogFileKey key = new LogFileKey();
private LogFileValue value = new LogFileValue();
private boolean hasNext;
private LogFileKey end;
- RecoveryLogReaderIterator(RecoveryLogReader reader, LogFileKey start, LogFileKey end)
- throws IOException {
- this.reader = reader;
+ private boolean next(LogFileKey key, LogFileValue value) throws IOException {
+ try {
+ return RecoveryLogReader.this.next(key, value);
+ } catch (EOFException e) {
+ return false;
+ }
+ }
+
+ RangeIterator(LogFileKey start, LogFileKey end) throws IOException {
this.end = end;
- reader.seek(start);
+ if (start != null) {
+ hasNext = next(key, value);
+
+ if (hasNext && key.event != LogEvents.OPEN) {
+ throw new IllegalStateException("First log entry value is not OPEN");
+ }
+
+ seek(start);
+ }
- hasNext = reader.next(key, value);
+ hasNext = next(key, value);
- if (hasNext && key.compareTo(start) < 0) {
+ if (hasNext && start != null && key.compareTo(start) < 0) {
throw new IllegalStateException("First key is less than start " + key + " " + start);
}
- if (hasNext && key.compareTo(end) > 0) {
+ if (hasNext && end != null && key.compareTo(end) > 0) {
hasNext = false;
}
}
@@ -236,8 +287,8 @@ public class RecoveryLogReader {
key = new LogFileKey();
value = new LogFileValue();
try {
- hasNext = reader.next(key, value);
- if (hasNext && key.compareTo(end) > 0) {
+ hasNext = next(key, value);
+ if (hasNext && end != null && key.compareTo(end) > 0) {
hasNext = false;
}
} catch (IOException e) {
@@ -246,6 +297,26 @@ public class RecoveryLogReader {
return entry;
}
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<LogFileKey,LogFileValue> next() {
+ return iter.next();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
index 95ee0c5..a68a4ec 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -16,12 +16,9 @@
*/
package org.apache.accumulo.tserver.log;
-import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
@@ -32,94 +29,30 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
import com.google.common.collect.UnmodifiableIterator;
/**
* Iterates over multiple sorted recovery logs merging them into a single sorted stream.
*/
-public class RecoveryLogsIterator
- implements Iterator<Entry<LogFileKey,LogFileValue>>, AutoCloseable {
+public class RecoveryLogsIterator implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
private static final Logger LOG = LoggerFactory.getLogger(RecoveryLogsIterator.class);
- private List<RecoveryLogReader> readers;
+ List<CloseableIterator<Entry<LogFileKey,LogFileValue>>> iterators;
private UnmodifiableIterator<Entry<LogFileKey,LogFileValue>> iter;
/**
- * Ensures source iterator provides data in sorted order
- */
- // TODO add unit test and move to RecoveryLogReader
- @VisibleForTesting
- static class SortCheckIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {
-
- private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
- private String sourceName;
-
- SortCheckIterator(String sourceName, Iterator<Entry<LogFileKey,LogFileValue>> source) {
- this.source = Iterators.peekingIterator(source);
- this.sourceName = sourceName;
- }
-
- @Override
- public boolean hasNext() {
- return source.hasNext();
- }
-
- @Override
- public Entry<LogFileKey,LogFileValue> next() {
- Entry<LogFileKey,LogFileValue> next = source.next();
- if (source.hasNext()) {
- Preconditions.checkState(next.getKey().compareTo(source.peek().getKey()) <= 0,
- "Data source %s keys not in order %s %s", sourceName, next.getKey(),
- source.peek().getKey());
- }
- return next;
- }
- }
-
- // TODO get rid of this (push down into iterator in RecoveryLogReader)
- private RecoveryLogReader open(VolumeManager fs, Path log) throws IOException {
- RecoveryLogReader reader = new RecoveryLogReader(fs, log);
- LogFileKey key = new LogFileKey();
- LogFileValue value = new LogFileValue();
- if (!reader.next(key, value)) {
- reader.close();
- return null;
- }
- if (key.event != OPEN) {
- RuntimeException e = new IllegalStateException(
- "First log entry value is not OPEN (" + log + ")");
- try {
- reader.close();
- } catch (Exception e2) {
- e.addSuppressed(e2);
- }
- throw e;
- }
-
- return reader;
- }
-
- /**
- * Iterates only over keys between [start,end].
+ * Iterates only over keys in the range [start,end].
*/
RecoveryLogsIterator(VolumeManager fs, List<Path> recoveryLogPaths, LogFileKey start,
LogFileKey end) throws IOException {
- readers = new ArrayList<>(recoveryLogPaths.size());
- ArrayList<Iterator<Entry<LogFileKey,LogFileValue>>> iterators = new ArrayList<>();
+ iterators = new ArrayList<>(recoveryLogPaths.size());
try {
for (Path log : recoveryLogPaths) {
- RecoveryLogReader reader = open(fs, log);
- if (reader != null) {
- readers.add(reader);
- iterators.add(new SortCheckIterator(log.getName(), reader.getIterator(start, end)));
- }
+ iterators.add(new RecoveryLogReader(fs, log, start, end));
}
iter = Iterators.mergeSorted(iterators, new Comparator<Entry<LogFileKey,LogFileValue>>() {
@@ -150,8 +83,13 @@ public class RecoveryLogsIterator
}
@Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
+ }
+
+ @Override
public void close() {
- for (RecoveryLogReader reader : readers) {
+ for (CloseableIterator<?> reader : iterators) {
try {
reader.close();
} catch (IOException e) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
index f84c906..5692a81 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/SortedLogRecovery.java
@@ -64,28 +64,28 @@ public class SortedLogRecovery {
static LogFileKey maxKey(LogEvents event) {
LogFileKey key = new LogFileKey();
key.event = event;
- key.tid = Integer.MAX_VALUE;
+ key.tabletId = Integer.MAX_VALUE;
key.seq = Long.MAX_VALUE;
return key;
}
static LogFileKey maxKey(LogEvents event, int tabletId) {
LogFileKey key = maxKey(event);
- key.tid = tabletId;
+ key.tabletId = tabletId;
return key;
}
static LogFileKey minKey(LogEvents event) {
LogFileKey key = new LogFileKey();
key.event = event;
- key.tid = 0;
+ key.tabletId = 0;
key.seq = 0;
return key;
}
static LogFileKey minKey(LogEvents event, int tabletId) {
LogFileKey key = minKey(event);
- key.tid = tabletId;
+ key.tabletId = tabletId;
return key;
}
@@ -106,12 +106,12 @@ public class SortedLogRecovery {
checkState(key.event == DEFINE_TABLET); // should only fail if bug elsewhere
if (key.tablet.equals(extent) || key.tablet.equals(alternative)) {
- checkState(key.tid >= 0, "Tid %s for %s is negative", key.tid, extent);
- checkState(tabletId == -1 || key.tid >= tabletId); // should only fail if bug in
+ checkState(key.tabletId >= 0, "tabletId %s for %s is negative", key.tabletId, extent);
+ checkState(tabletId == -1 || key.tabletId >= tabletId); // should only fail if bug in
// RecoveryLogsIterator
- if (tabletId != key.tid) {
- tabletId = key.tid;
+ if (tabletId != key.tabletId) {
+ tabletId = key.tabletId;
}
}
}
@@ -175,7 +175,7 @@ public class SortedLogRecovery {
LogFileKey key = ddi.next().getKey();
checkState(key.seq >= 0, "Unexpected negative seq %s for tabletId %s", key.seq, tabletId);
- checkState(key.tid == tabletId); // should only fail if bug elsewhere
+ checkState(key.tabletId == tabletId); // should only fail if bug elsewhere
if (key.event == COMPACTION_START) {
checkState(key.seq >= lastStart); // should only fail if bug elsewhere
@@ -186,7 +186,7 @@ public class SortedLogRecovery {
firstEventWasFinish = true;
} else if (lastEvent == COMPACTION_FINISH) {
throw new IllegalStateException(
- "Saw consecutive COMPACTION_FINISH events " + key.tid + " " + key.seq);
+ "Saw consecutive COMPACTION_FINISH events " + key.tabletId + " " + key.seq);
} else {
if (key.seq <= lastStart) {
throw new IllegalStateException(
@@ -230,7 +230,7 @@ public class SortedLogRecovery {
while (rli.hasNext()) {
Entry<LogFileKey,LogFileValue> entry = rli.next();
- checkState(entry.getKey().tid == tabletId); // should only fail if bug elsewhere
+ checkState(entry.getKey().tabletId == tabletId); // should only fail if bug elsewhere
checkState(entry.getKey().seq >= recoverySeq); // should only fail if bug elsewhere
if (entry.getKey().event == MUTATION) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java
index 044629e..137fe06 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java
@@ -17,9 +17,6 @@
package org.apache.accumulo.tserver.logger;
public enum LogEvents {
- // TODO add unit test to verify ordinals, rather than rely on dubious comments
- // TODO if possible, rename COMPACTION to "FLUSH" (or at least "MINC") without changing
- // serialization
// DO NOT CHANGE ORDER OF ENUMS, ORDER IS USED IN SERIALIZATION
OPEN, DEFINE_TABLET, MUTATION, MANY_MUTATIONS, COMPACTION_START, COMPACTION_FINISH;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
index 399874a..7eb00cd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogFileKey.java
@@ -34,7 +34,7 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
public String filename = null;
public KeyExtent tablet = null;
public long seq = -1;
- public int tid = -1; // TODO rename to tabletId
+ public int tabletId = -1;
public static final int VERSION = 2;
public String tserverSession;
@@ -48,35 +48,35 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
event = LogEvents.values()[value];
switch (event) {
case OPEN:
- tid = in.readInt();
+ tabletId = in.readInt();
tserverSession = in.readUTF();
- if (tid != VERSION) {
- throw new RuntimeException(String
- .format("Bad version number for log file: expected %d, but saw %d", VERSION, tid));
+ if (tabletId != VERSION) {
+ throw new RuntimeException(String.format(
+ "Bad version number for log file: expected %d, but saw %d", VERSION, tabletId));
}
break;
case COMPACTION_FINISH:
seq = in.readLong();
- tid = in.readInt();
+ tabletId = in.readInt();
break;
case COMPACTION_START:
seq = in.readLong();
- tid = in.readInt();
+ tabletId = in.readInt();
filename = in.readUTF();
break;
case DEFINE_TABLET:
seq = in.readLong();
- tid = in.readInt();
+ tabletId = in.readInt();
tablet = new KeyExtent();
tablet.readFields(in);
break;
case MANY_MUTATIONS:
seq = in.readLong();
- tid = in.readInt();
+ tabletId = in.readInt();
break;
case MUTATION:
seq = in.readLong();
- tid = in.readInt();
+ tabletId = in.readInt();
break;
default:
throw new RuntimeException("Unknown log event type: " + event);
@@ -90,32 +90,32 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
switch (event) {
case OPEN:
seq = -1;
- tid = -1;
+ tabletId = -1;
out.writeInt(VERSION);
out.writeUTF(tserverSession);
// out.writeUTF(Accumulo.getInstanceID());
break;
case COMPACTION_FINISH:
out.writeLong(seq);
- out.writeInt(tid);
+ out.writeInt(tabletId);
break;
case COMPACTION_START:
out.writeLong(seq);
- out.writeInt(tid);
+ out.writeInt(tabletId);
out.writeUTF(filename);
break;
case DEFINE_TABLET:
out.writeLong(seq);
- out.writeInt(tid);
+ out.writeInt(tabletId);
tablet.write(out);
break;
case MANY_MUTATIONS:
out.writeLong(seq);
- out.writeInt(tid);
+ out.writeInt(tabletId);
break;
case MUTATION:
out.writeLong(seq);
- out.writeInt(tid);
+ out.writeInt(tabletId);
break;
default:
throw new IllegalArgumentException("Bad value for LogFileEntry type");
@@ -151,8 +151,8 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
}
if (this.event == OPEN)
return 0;
- if (this.tid != o.tid) {
- return this.tid - o.tid;
+ if (this.tabletId != o.tabletId) {
+ return this.tabletId - o.tabletId;
}
return sign(this.seq - o.seq);
}
@@ -176,15 +176,15 @@ public class LogFileKey implements WritableComparable<LogFileKey> {
case OPEN:
return String.format("OPEN %s", tserverSession);
case COMPACTION_FINISH:
- return String.format("COMPACTION_FINISH %d %d", tid, seq);
+ return String.format("COMPACTION_FINISH %d %d", tabletId, seq);
case COMPACTION_START:
- return String.format("COMPACTION_START %d %d %s", tid, seq, filename);
+ return String.format("COMPACTION_START %d %d %s", tabletId, seq, filename);
case MUTATION:
- return String.format("MUTATION %d %d", tid, seq);
+ return String.format("MUTATION %d %d", tabletId, seq);
case MANY_MUTATIONS:
- return String.format("MANY_MUTATIONS %d %d", tid, seq);
+ return String.format("MANY_MUTATIONS %d %d", tabletId, seq);
case DEFINE_TABLET:
- return String.format("DEFINE_TABLET %d %d %s", tid, seq, tablet);
+ return String.format("DEFINE_TABLET %d %d %s", tabletId, seq, tablet);
}
throw new RuntimeException("Unknown type of entry: " + event);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index ef26a36..ce465ce 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -129,9 +130,12 @@ public class LogReader {
}
} else {
// read the log entries sorted in a map file
- RecoveryLogReader input = new RecoveryLogReader(fs, path);
- while (input.next(key, value)) {
- printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
+ try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
+ while (input.hasNext()) {
+ Entry<LogFileKey,LogFileValue> entry = input.next();
+ printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+ opts.maxMutations);
+ }
}
}
}
@@ -143,11 +147,11 @@ public class LogReader {
if (ke != null) {
if (key.event == LogEvents.DEFINE_TABLET) {
if (key.tablet.equals(ke)) {
- tabletIds.add(key.tid);
+ tabletIds.add(key.tabletId);
} else {
return;
}
- } else if (!tabletIds.contains(key.tid)) {
+ } else if (!tabletIds.contains(key.tabletId)) {
return;
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index c2505ff..613b13d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -720,7 +720,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
switch (key.event) {
case DEFINE_TABLET:
if (target.getSourceTableId().equals(key.tablet.getTableId())) {
- desiredTids.add(key.tid);
+ desiredTids.add(key.tabletId);
}
break;
default:
@@ -772,13 +772,13 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
case DEFINE_TABLET:
// For new DEFINE_TABLETs, we also need to record the new tids we see
if (target.getSourceTableId().equals(key.tablet.getTableId())) {
- desiredTids.add(key.tid);
+ desiredTids.add(key.tabletId);
}
break;
case MUTATION:
case MANY_MUTATIONS:
// Only write out mutations for tids that are for the desired tablet
- if (desiredTids.contains(key.tid)) {
+ if (desiredTids.contains(key.tabletId)) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEventsTest.java
similarity index 56%
copy from server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java
copy to server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEventsTest.java
index 044629e..f3bc904 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogEvents.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogEventsTest.java
@@ -14,12 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.accumulo.tserver.logger;
+package org.apache.accumulo.tserver.log;
-public enum LogEvents {
- // TODO add unit test to verify ordinals, rather than rely on dubious comments
- // TODO if possible, rename COMPACTION to "FLUSH" (or at least "MINC") without changing
- // serialization
- // DO NOT CHANGE ORDER OF ENUMS, ORDER IS USED IN SERIALIZATION
- OPEN, DEFINE_TABLET, MUTATION, MANY_MUTATIONS, COMPACTION_START, COMPACTION_FINISH;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LogEventsTest {
+ @Test
+ public void testOrdinals() {
+ // Ordinals are used for persistence, so its important they are stable.
+
+ LogEvents[] expectedOrder = new LogEvents[] {LogEvents.OPEN, LogEvents.DEFINE_TABLET,
+ LogEvents.MUTATION, LogEvents.MANY_MUTATIONS, LogEvents.COMPACTION_START,
+ LogEvents.COMPACTION_FINISH};
+
+ for (int i = 0; i < expectedOrder.length; i++) {
+ Assert.assertEquals(i, expectedOrder[i].ordinal());
+ }
+ }
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogFileKeyTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogFileKeyTest.java
new file mode 100644
index 0000000..0aacef2
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/LogFileKeyTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LogFileKeyTest {
+
+ private static LogFileKey nk(LogEvents e, int tabletId, long seq) {
+ LogFileKey k = new LogFileKey();
+ k.event = e;
+ k.tabletId = tabletId;
+ k.seq = seq;
+ return k;
+ }
+
+ @Test
+ public void testEquivalence() {
+
+ LogFileKey start = nk(LogEvents.COMPACTION_START, 1, 3);
+ LogFileKey finish = nk(LogEvents.COMPACTION_FINISH, 1, 3);
+ LogFileKey mut = nk(LogEvents.MUTATION, 1, 3);
+ LogFileKey mmut = nk(LogEvents.MANY_MUTATIONS, 1, 3);
+
+ Assert.assertTrue(start.compareTo(finish) == 0);
+ Assert.assertTrue(finish.compareTo(start) == 0);
+
+ Assert.assertTrue(mut.compareTo(mmut) == 0);
+ Assert.assertTrue(mmut.compareTo(mut) == 0);
+ }
+
+ @Test
+ public void testSortOrder() {
+ List<LogFileKey> keys = new ArrayList<>();
+
+ // add keys in expected sort order
+ keys.add(nk(LogEvents.OPEN, 0, 0));
+
+ keys.add(nk(LogEvents.DEFINE_TABLET, 3, 6));
+ keys.add(nk(LogEvents.DEFINE_TABLET, 3, 7));
+ keys.add(nk(LogEvents.DEFINE_TABLET, 4, 2));
+ keys.add(nk(LogEvents.DEFINE_TABLET, 4, 9));
+
+ keys.add(nk(LogEvents.COMPACTION_START, 3, 3));
+ keys.add(nk(LogEvents.COMPACTION_FINISH, 3, 5));
+ keys.add(nk(LogEvents.COMPACTION_START, 3, 7));
+ keys.add(nk(LogEvents.COMPACTION_FINISH, 3, 9));
+
+ keys.add(nk(LogEvents.COMPACTION_START, 4, 1));
+ keys.add(nk(LogEvents.COMPACTION_FINISH, 4, 3));
+ keys.add(nk(LogEvents.COMPACTION_START, 4, 11));
+ keys.add(nk(LogEvents.COMPACTION_FINISH, 4, 13));
+
+ keys.add(nk(LogEvents.MUTATION, 3, 1));
+ keys.add(nk(LogEvents.MUTATION, 3, 2));
+ keys.add(nk(LogEvents.MUTATION, 3, 3));
+ keys.add(nk(LogEvents.MUTATION, 3, 3));
+ keys.add(nk(LogEvents.MANY_MUTATIONS, 3, 11));
+
+ keys.add(nk(LogEvents.MANY_MUTATIONS, 4, 2));
+ keys.add(nk(LogEvents.MUTATION, 4, 3));
+ keys.add(nk(LogEvents.MUTATION, 4, 5));
+ keys.add(nk(LogEvents.MUTATION, 4, 7));
+ keys.add(nk(LogEvents.MANY_MUTATIONS, 4, 15));
+
+ for (int i = 0; i < 10; i++) {
+ List<LogFileKey> testList = new ArrayList<>(keys);
+ Collections.shuffle(testList);
+ Collections.sort(testList);
+
+ Assert.assertEquals(keys, testList);
+ }
+
+ }
+}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
index 166a381..37feafd 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
@@ -22,9 +22,17 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.tserver.log.RecoveryLogReader.SortCheckIterator;
+import org.apache.accumulo.tserver.logger.LogEvents;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -144,4 +152,29 @@ public class RecoveryLogsReaderTest {
}
+ @Test(expected = IllegalStateException.class)
+ public void testSortCheck() {
+
+ List<Entry<LogFileKey,LogFileValue>> unsorted = new ArrayList<>();
+
+ LogFileKey k1 = new LogFileKey();
+ k1.event = LogEvents.MANY_MUTATIONS;
+ k1.tabletId = 2;
+ k1.seq = 55;
+
+ LogFileKey k2 = new LogFileKey();
+ k2.event = LogEvents.MANY_MUTATIONS;
+ k2.tabletId = 9;
+ k2.seq = 9;
+
+ unsorted.add(new AbstractMap.SimpleEntry<>(k2, (LogFileValue) null));
+ unsorted.add(new AbstractMap.SimpleEntry<>(k1, (LogFileValue) null));
+
+ SortCheckIterator iter = new SortCheckIterator(unsorted.iterator());
+
+ while (iter.hasNext()) {
+ iter.next();
+ }
+ }
+
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
index dc48d62..bdf5210 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/SortedLogRecoveryTest.java
@@ -51,7 +51,9 @@ import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.MapFile.Writer;
import org.apache.hadoop.io.Text;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
public class SortedLogRecoveryTest {
@@ -61,6 +63,9 @@ public class SortedLogRecoveryTest {
static final Text cq = new Text("cq");
static final Value value = new Value("value".getBytes());
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
static class KeyValue implements Comparable<KeyValue> {
public final LogFileKey key;
public final LogFileValue value;
@@ -92,7 +97,7 @@ public class SortedLogRecoveryTest {
KeyValue result = new KeyValue();
result.key.event = type;
result.key.seq = seq;
- result.key.tid = tid;
+ result.key.tabletId = tid;
switch (type) {
case OPEN:
result.key.tserverSession = (String) fileExtentMutation;
@@ -676,7 +681,9 @@ public class SortedLogRecoveryTest {
@Test
public void testLeaveAndComeBack() throws IOException {
- // TODO document scenario
+ /**
+ * This test recreates the situation in bug #449 (Github issues).
+ */
Mutation m1 = new ServerMutation(new Text("r1"));
m1.put("f1", "q1", "v1");
@@ -705,7 +712,6 @@ public class SortedLogRecoveryTest {
@Test
public void testMultipleTablets() throws IOException {
- // TODO document scenario
KeyExtent e1 = new KeyExtent("1", new Text("m"), null);
KeyExtent e2 = new KeyExtent("1", null, new Text("m"));
@@ -812,7 +818,178 @@ public class SortedLogRecoveryTest {
}
}
- // TODO test only logs with only a compaction finish event
- // TODO test logs with consecutive compaction finish events
- // TODO test logs with consecutive duplicate compaction finish events
+ @Test
+ public void testOnlyCompactionFinishEvent() throws IOException {
+ Mutation m1 = new ServerMutation(new Text("r1"));
+ m1.put("f1", "q1", "v1");
+
+ Mutation m2 = new ServerMutation(new Text("r2"));
+ m2.put("f1", "q1", "v2");
+
+ // The presence of only a compaction finish event indicates the write ahead logs are incomplete
+ // in some way. This should cause an exception.
+
+ KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+ createKeyValue(DEFINE_TABLET, 100, 10, extent), createKeyValue(MUTATION, 100, 10, m1),
+ createKeyValue(COMPACTION_FINISH, 102, 10, null), createKeyValue(MUTATION, 105, 10, m2)};
+
+ Arrays.sort(entries1);
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+ logs.put("entries1", entries1);
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage(
+ LogEvents.COMPACTION_FINISH.name() + " (without preceding " + LogEvents.COMPACTION_START);
+ recover(logs, extent);
+ }
+
+ @Test
+ public void testConsecutiveCompactionFinishEvents() throws IOException {
+ Mutation m1 = new ServerMutation(new Text("r1"));
+ m1.put("f1", "q1", "v1");
+
+ Mutation m2 = new ServerMutation(new Text("r2"));
+ m2.put("f1", "q1", "v2");
+
+ // Consecutive compaction finish events indicate the write ahead logs are incomplete in some
+ // way. This should cause an exception.
+
+ KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+ createKeyValue(DEFINE_TABLET, 100, 10, extent), createKeyValue(MUTATION, 100, 10, m1),
+ createKeyValue(COMPACTION_START, 102, 10, "/t/f1"),
+ createKeyValue(COMPACTION_FINISH, 103, 10, null),
+ createKeyValue(COMPACTION_FINISH, 109, 10, null), createKeyValue(MUTATION, 105, 10, m2)};
+
+ Arrays.sort(entries1);
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+ logs.put("entries1", entries1);
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("consecutive " + LogEvents.COMPACTION_FINISH.name());
+ recover(logs, extent);
+ }
+
+ @Test
+ public void testDuplicateCompactionFinishEvents() throws IOException {
+ Mutation m1 = new ServerMutation(new Text("r1"));
+ m1.put("f1", "q1", "v1");
+
+ Mutation m2 = new ServerMutation(new Text("r2"));
+ m2.put("f1", "q1", "v2");
+
+ // Duplicate consecutive compaction finish events should not cause an exception.
+
+ KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+ createKeyValue(DEFINE_TABLET, 100, 10, extent), createKeyValue(MUTATION, 100, 10, m1),
+ createKeyValue(COMPACTION_START, 102, 10, "/t/f1"),
+ createKeyValue(COMPACTION_FINISH, 104, 10, null),
+ createKeyValue(COMPACTION_FINISH, 104, 10, null), createKeyValue(MUTATION, 103, 10, m2)};
+
+ Arrays.sort(entries1);
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+ logs.put("entries1", entries1);
+
+ List<Mutation> mutations1 = recover(logs, extent);
+ Assert.assertEquals(1, mutations1.size());
+ Assert.assertEquals(m2, mutations1.get(0));
+ }
+
+ @Test
+ public void testEmptyLogFiles() throws IOException {
+ Mutation m1 = new ServerMutation(new Text("r1"));
+ m1.put("f1", "q1", "v1");
+
+ Mutation m2 = new ServerMutation(new Text("r2"));
+ m2.put("f1", "q1", "v2");
+
+ KeyValue entries1[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+ createKeyValue(DEFINE_TABLET, 100, 10, extent), createKeyValue(MUTATION, 100, 10, m1)};
+
+ KeyValue entries2[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1")};
+
+ KeyValue entries3[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+ createKeyValue(DEFINE_TABLET, 105, 10, extent),
+ createKeyValue(COMPACTION_START, 107, 10, "/t/f1")};
+
+ KeyValue entries4[] = new KeyValue[] {};
+
+ KeyValue entries5[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+ createKeyValue(DEFINE_TABLET, 107, 10, extent),
+ createKeyValue(COMPACTION_FINISH, 111, 10, null)};
+
+ KeyValue entries6[] = new KeyValue[] {createKeyValue(OPEN, 0, -1, "1"),
+ createKeyValue(DEFINE_TABLET, 122, 10, extent), createKeyValue(MUTATION, 123, 10, m2)};
+
+ Arrays.sort(entries1);
+ Arrays.sort(entries2);
+ Arrays.sort(entries3);
+ Arrays.sort(entries4);
+ Arrays.sort(entries5);
+ Arrays.sort(entries6);
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+
+ logs.put("entries1", entries1);
+
+ List<Mutation> mutations = recover(logs, extent);
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertEquals(m1, mutations.get(0));
+
+ logs.put("entries2", entries2);
+
+ mutations = recover(logs, extent);
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertEquals(m1, mutations.get(0));
+
+ logs.put("entries3", entries3);
+
+ mutations = recover(logs, extent);
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertEquals(m1, mutations.get(0));
+
+ logs.put("entries4", entries4);
+
+ mutations = recover(logs, extent);
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertEquals(m1, mutations.get(0));
+
+ logs.put("entries5", entries5);
+
+ mutations = recover(logs, extent);
+ Assert.assertEquals(0, mutations.size());
+
+ logs.put("entries6", entries6);
+
+ mutations = recover(logs, extent);
+ Assert.assertEquals(1, mutations.size());
+ Assert.assertEquals(m2, mutations.get(0));
+ }
+
+ @Test
+ public void testFileWithoutOpen() throws IOException {
+ Mutation m1 = new ServerMutation(new Text("r1"));
+ m1.put("f1", "q1", "v1");
+
+ Mutation m2 = new ServerMutation(new Text("r2"));
+ m2.put("f1", "q1", "v2");
+
+ // Its expected that every log file should have an open event as the first event. Should throw
+ // an error if not present.
+
+ KeyValue entries1[] = new KeyValue[] {createKeyValue(DEFINE_TABLET, 100, 10, extent),
+ createKeyValue(MUTATION, 100, 10, m1), createKeyValue(COMPACTION_FINISH, 102, 10, null),
+ createKeyValue(MUTATION, 105, 10, m2)};
+
+ Arrays.sort(entries1);
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+ logs.put("entries1", entries1);
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("not " + LogEvents.OPEN);
+ recover(logs, extent);
+ }
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
index 19fb3c9..5e26a9d 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/logger/LogFileTest.java
@@ -46,7 +46,7 @@ public class LogFileTest {
LogFileKey key = new LogFileKey();
key.event = event;
key.seq = seq;
- key.tid = tid;
+ key.tabletId = tid;
key.filename = filename;
key.tablet = tablet;
key.tserverSession = keyResult.tserverSession;
@@ -75,24 +75,24 @@ public class LogFileTest {
readWrite(COMPACTION_FINISH, 1, 2, null, null, null, key, value);
assertEquals(key.event, COMPACTION_FINISH);
assertEquals(key.seq, 1);
- assertEquals(key.tid, 2);
+ assertEquals(key.tabletId, 2);
readWrite(COMPACTION_START, 3, 4, "some file", null, null, key, value);
assertEquals(key.event, COMPACTION_START);
assertEquals(key.seq, 3);
- assertEquals(key.tid, 4);
+ assertEquals(key.tabletId, 4);
assertEquals(key.filename, "some file");
KeyExtent tablet = new KeyExtent("table", new Text("bbbb"), new Text("aaaa"));
readWrite(DEFINE_TABLET, 5, 6, null, tablet, null, key, value);
assertEquals(key.event, DEFINE_TABLET);
assertEquals(key.seq, 5);
- assertEquals(key.tid, 6);
+ assertEquals(key.tabletId, 6);
assertEquals(key.tablet, tablet);
Mutation m = new ServerMutation(new Text("row"));
m.put(new Text("cf"), new Text("cq"), new Value("value".getBytes()));
readWrite(MUTATION, 7, 8, null, null, new Mutation[] {m}, key, value);
assertEquals(key.event, MUTATION);
assertEquals(key.seq, 7);
- assertEquals(key.tid, 8);
+ assertEquals(key.tabletId, 8);
assertEquals(value.mutations, Arrays.asList(m));
m = new ServerMutation(new Text("row"));
m.put(new Text("cf"), new Text("cq"), new ColumnVisibility("vis"), 12345,
@@ -103,12 +103,12 @@ public class LogFileTest {
readWrite(MUTATION, 8, 9, null, null, new Mutation[] {m}, key, value);
assertEquals(key.event, MUTATION);
assertEquals(key.seq, 8);
- assertEquals(key.tid, 9);
+ assertEquals(key.tabletId, 9);
assertEquals(value.mutations, Arrays.asList(m));
readWrite(MANY_MUTATIONS, 9, 10, null, null, new Mutation[] {m, m}, key, value);
assertEquals(key.event, MANY_MUTATIONS);
assertEquals(key.seq, 9);
- assertEquals(key.tid, 10);
+ assertEquals(key.tabletId, 10);
assertEquals(value.mutations, Arrays.asList(m, m));
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
index 1ff9799..107060b 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystemTest.java
@@ -79,7 +79,7 @@ public class AccumuloReplicaSystemTest {
*/
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent("1", null, null);
- key.tid = 1;
+ key.tabletId = 1;
key.write(dos);
value.write(dos);
@@ -94,14 +94,14 @@ public class AccumuloReplicaSystemTest {
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent("2", null, null);
- key.tid = 2;
+ key.tabletId = 2;
value.mutations = Collections.emptyList();
key.write(dos);
value.write(dos);
key.event = LogEvents.OPEN;
- key.tid = LogFileKey.VERSION;
+ key.tabletId = LogFileKey.VERSION;
key.tserverSession = "foobar";
key.write(dos);
@@ -116,7 +116,7 @@ public class AccumuloReplicaSystemTest {
value.write(dos);
key.event = LogEvents.COMPACTION_START;
- key.tid = 2;
+ key.tabletId = 2;
key.filename = "/accumulo/tables/1/t-000001/A000001.rf";
value.mutations = Collections.emptyList();
@@ -125,14 +125,14 @@ public class AccumuloReplicaSystemTest {
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent("1", null, null);
- key.tid = 3;
+ key.tabletId = 3;
value.mutations = Collections.emptyList();
key.write(dos);
value.write(dos);
key.event = LogEvents.COMPACTION_FINISH;
- key.tid = 6;
+ key.tabletId = 6;
value.mutations = Collections.emptyList();
key.write(dos);
@@ -140,7 +140,7 @@ public class AccumuloReplicaSystemTest {
key.tablet = null;
key.event = LogEvents.MUTATION;
- key.tid = 3;
+ key.tabletId = 3;
key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
@@ -188,7 +188,7 @@ public class AccumuloReplicaSystemTest {
*/
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent("1", null, null);
- key.tid = 1;
+ key.tabletId = 1;
key.write(dos);
value.write(dos);
@@ -203,14 +203,14 @@ public class AccumuloReplicaSystemTest {
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent("2", null, null);
- key.tid = 2;
+ key.tabletId = 2;
value.mutations = Collections.emptyList();
key.write(dos);
value.write(dos);
key.event = LogEvents.OPEN;
- key.tid = LogFileKey.VERSION;
+ key.tabletId = LogFileKey.VERSION;
key.tserverSession = "foobar";
key.write(dos);
@@ -225,7 +225,7 @@ public class AccumuloReplicaSystemTest {
value.write(dos);
key.event = LogEvents.COMPACTION_START;
- key.tid = 2;
+ key.tabletId = 2;
key.filename = "/accumulo/tables/1/t-000001/A000001.rf";
value.mutations = Collections.emptyList();
@@ -234,14 +234,14 @@ public class AccumuloReplicaSystemTest {
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent("1", null, null);
- key.tid = 3;
+ key.tabletId = 3;
value.mutations = Collections.emptyList();
key.write(dos);
value.write(dos);
key.event = LogEvents.COMPACTION_FINISH;
- key.tid = 6;
+ key.tabletId = 6;
value.mutations = Collections.emptyList();
key.write(dos);
@@ -249,7 +249,7 @@ public class AccumuloReplicaSystemTest {
key.tablet = null;
key.event = LogEvents.MUTATION;
- key.tid = 3;
+ key.tabletId = 3;
key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
@@ -396,7 +396,7 @@ public class AccumuloReplicaSystemTest {
*/
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent("1", null, null);
- key.tid = 1;
+ key.tabletId = 1;
key.write(dos);
value.write(dos);
@@ -411,7 +411,7 @@ public class AccumuloReplicaSystemTest {
key.tablet = null;
key.event = LogEvents.MUTATION;
- key.tid = 1;
+ key.tabletId = 1;
key.filename = "/accumulo/wals/tserver+port/" + UUID.randomUUID();
value.mutations = Arrays.<Mutation> asList(new ServerMutation(new Text("row")));
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
index 6d2c66a..67524a8 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/BatchWriterReplicationReplayerTest.java
@@ -83,7 +83,7 @@ public class BatchWriterReplicationReplayerTest {
LogFileKey key = new LogFileKey();
key.event = LogEvents.MANY_MUTATIONS;
key.seq = 1;
- key.tid = 1;
+ key.tabletId = 1;
WalEdits edits = new WalEdits();
@@ -149,7 +149,7 @@ public class BatchWriterReplicationReplayerTest {
LogFileKey key = new LogFileKey();
key.event = LogEvents.MANY_MUTATIONS;
key.seq = 1;
- key.tid = 1;
+ key.tabletId = 1;
WalEdits edits = new WalEdits();
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java b/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
index fc47843..bdde3f7 100644
--- a/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/replication/UnusedWalDoesntCloseReplicationStatusIT.java
@@ -118,7 +118,7 @@ public class UnusedWalDoesntCloseReplicationStatusIT extends ConfigurableMacBase
key.event = LogEvents.DEFINE_TABLET;
key.tablet = new KeyExtent(Integer.toString(fakeTableId), null, null);
key.seq = 1l;
- key.tid = 1;
+ key.tabletId = 1;
key.write(dos);
value.write(dos);
--
To stop receiving notification emails like this one, please contact
kturner@apache.org.