You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by jm...@apache.org on 2021/07/14 12:16:01 UTC
[accumulo] branch main updated: Update LogReader to utilize
RecoveryLogsIterator (#2181)
This is an automated email from the ASF dual-hosted git repository.
jmanno pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f9f1d3f Update LogReader to utilize RecoveryLogsIterator (#2181)
f9f1d3f is described below
commit f9f1d3f6578c400862e9f0e8d82a49bf9c4a5392
Author: Jeffrey Manno <je...@gmail.com>
AuthorDate: Wed Jul 14 08:15:52 2021 -0400
Update LogReader to utilize RecoveryLogsIterator (#2181)
* Adds utilization of RecoveryLogsIterator to read sorted Rfiles inside LogReader.java
* Removed old implementation of RecoveryLogReader and removed associated test
* Added unit test for RecoveryLogsIterator, RecoveryLogsIteratorTest
Co-authored-by: Christopher Tubbs <ct...@apache.org>
---
.../accumulo/tserver/log/RecoveryLogReader.java | 326 ---------------------
.../accumulo/tserver/log/RecoveryLogsIterator.java | 4 +-
.../apache/accumulo/tserver/logger/LogReader.java | 31 +-
.../tserver/log/RecoveryLogsIteratorTest.java | 251 ++++++++++++++++
.../tserver/log/RecoveryLogsReaderTest.java | 216 --------------
5 files changed, 273 insertions(+), 555 deletions(-)
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
deleted file mode 100644
index bde5a1c..0000000
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogReader.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.tserver.log;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.AbstractMap;
-import java.util.Iterator;
-import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.PriorityQueue;
-
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.SortedLogState;
-import org.apache.accumulo.tserver.logger.LogEvents;
-import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.MapFile.Reader;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
-
-/**
- * A class which reads sorted recovery logs produced from a single WAL.
- *
- * Presently only supports next() and seek() and works on all the Map directories within a
- * directory. The primary purpose of this class is to merge the results of multiple Reduce jobs that
- * result in Map output files.
- */
-public class RecoveryLogReader implements CloseableIterator<Entry<LogFileKey,LogFileValue>> {
-
- /**
- * Group together the next key/value from a Reader with the Reader
- */
- private static class Index implements Comparable<Index> {
- Reader reader;
- WritableComparable<?> key;
- Writable value;
- boolean cached = false;
-
- private static Object create(java.lang.Class<?> klass) {
- try {
- return klass.getConstructor().newInstance();
- } catch (Exception t) {
- throw new RuntimeException("Unable to construct objects to use for comparison");
- }
- }
-
- public Index(Reader reader) {
- this.reader = reader;
- key = (WritableComparable<?>) create(reader.getKeyClass());
- value = (Writable) create(reader.getValueClass());
- }
-
- private void cache() throws IOException {
- if (!cached && reader.next(key, value)) {
- cached = true;
- }
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(key);
- }
-
- @Override
- public boolean equals(Object obj) {
- return this == obj || (obj != null && obj instanceof Index && compareTo((Index) obj) == 0);
- }
-
- @Override
- public int compareTo(Index o) {
- try {
- cache();
- o.cache();
- // no more data: always goes to the end
- if (!cached)
- return 1;
- if (!o.cached)
- return -1;
- @SuppressWarnings({"unchecked", "rawtypes"})
- int result = ((WritableComparable) key).compareTo(o.key);
- return result;
- } catch (IOException ex) {
- throw new UncheckedIOException(ex);
- }
- }
- }
-
- private PriorityQueue<Index> heap = new PriorityQueue<>();
- private Iterator<Entry<LogFileKey,LogFileValue>> iter;
-
- public RecoveryLogReader(VolumeManager fs, Path directory) throws IOException {
- this(fs, directory, null, null);
- }
-
- public RecoveryLogReader(VolumeManager fs, Path directory, LogFileKey start, LogFileKey end)
- throws IOException {
- boolean foundFinish = false;
- for (FileStatus child : fs.listStatus(directory)) {
- if (child.getPath().getName().startsWith("_"))
- continue;
- if (SortedLogState.isFinished(child.getPath().getName())) {
- foundFinish = true;
- continue;
- }
- if (SortedLogState.FAILED.getMarker().equals(child.getPath().getName())) {
- continue;
- }
- FileSystem ns = fs.getFileSystemByPath(child.getPath());
- heap.add(new Index(new Reader(ns.makeQualified(child.getPath()), ns.getConf())));
- }
- if (!foundFinish)
- throw new IOException(
- "Sort '" + SortedLogState.FINISHED.getMarker() + "' flag not found in " + directory);
-
- iter = new SortCheckIterator(new RangeIterator(start, end));
- }
-
- private static void copy(Writable src, Writable dest) throws IOException {
- // not exactly efficient...
- DataOutputBuffer output = new DataOutputBuffer();
- src.write(output);
- DataInputBuffer input = new DataInputBuffer();
- input.reset(output.getData(), output.getLength());
- dest.readFields(input);
- }
-
- @VisibleForTesting
- synchronized boolean next(WritableComparable<?> key, Writable val) throws IOException {
- Index elt = heap.remove();
- try {
- elt.cache();
- if (elt.cached) {
- copy(elt.key, key);
- copy(elt.value, val);
- elt.cached = false;
- } else {
- return false;
- }
- } finally {
- heap.add(elt);
- }
- return true;
- }
-
- @VisibleForTesting
- synchronized boolean seek(WritableComparable<?> key) throws IOException {
- PriorityQueue<Index> reheap = new PriorityQueue<>(heap.size());
- boolean result = false;
- for (Index index : heap) {
- try {
- WritableComparable<?> found = index.reader.getClosest(key, index.value, true);
- if (found != null && found.equals(key)) {
- result = true;
- }
- } catch (EOFException ex) {
- // thrown if key is beyond all data in the map
- }
- index.cached = false;
- reheap.add(index);
- }
- heap = reheap;
- return result;
- }
-
- @Override
- public void close() throws IOException {
- IOException problem = null;
- for (Index index : heap) {
- try {
- index.reader.close();
- } catch (IOException ex) {
- problem = ex;
- }
- }
- if (problem != null)
- throw problem;
- heap = null;
- }
-
- /**
- * Ensures source iterator provides data in sorted order
- */
- @VisibleForTesting
- static class SortCheckIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {
-
- private PeekingIterator<Entry<LogFileKey,LogFileValue>> source;
-
- SortCheckIterator(Iterator<Entry<LogFileKey,LogFileValue>> source) {
- this.source = Iterators.peekingIterator(source);
-
- }
-
- @Override
- public boolean hasNext() {
- return source.hasNext();
- }
-
- @Override
- public Entry<LogFileKey,LogFileValue> next() {
- Entry<LogFileKey,LogFileValue> next = source.next();
- if (source.hasNext()) {
- Preconditions.checkState(next.getKey().compareTo(source.peek().getKey()) <= 0,
- "Keys not in order %s %s", next.getKey(), source.peek().getKey());
- }
- return next;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove");
- }
- }
-
- private class RangeIterator implements Iterator<Entry<LogFileKey,LogFileValue>> {
-
- private LogFileKey key = new LogFileKey();
- private LogFileValue value = new LogFileValue();
- private boolean hasNext;
- private LogFileKey end;
-
- private boolean next(LogFileKey key, LogFileValue value) throws IOException {
- try {
- return RecoveryLogReader.this.next(key, value);
- } catch (EOFException e) {
- return false;
- }
- }
-
- RangeIterator(LogFileKey start, LogFileKey end) throws IOException {
- this.end = end;
-
- if (start != null) {
- hasNext = next(key, value);
-
- if (hasNext && key.event != LogEvents.OPEN) {
- throw new IllegalStateException("First log entry value is not OPEN");
- }
-
- seek(start);
- }
-
- hasNext = next(key, value);
-
- if (hasNext && start != null && key.compareTo(start) < 0) {
- throw new IllegalStateException("First key is less than start " + key + " " + start);
- }
-
- if (hasNext && end != null && key.compareTo(end) > 0) {
- hasNext = false;
- }
- }
-
- @Override
- public boolean hasNext() {
- return hasNext;
- }
-
- @Override
- public Entry<LogFileKey,LogFileValue> next() {
- Preconditions.checkState(hasNext);
- Entry<LogFileKey,LogFileValue> entry = new AbstractMap.SimpleImmutableEntry<>(key, value);
-
- key = new LogFileKey();
- value = new LogFileValue();
- try {
- hasNext = next(key, value);
- if (hasNext && end != null && key.compareTo(end) > 0) {
- hasNext = false;
- }
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
-
- return entry;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove");
- }
- }
-
- @Override
- public boolean hasNext() {
- return iter.hasNext();
- }
-
- @Override
- public Entry<LogFileKey,LogFileValue> next() {
- return iter.next();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("remove");
- }
-
-}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
index 0f5e259..75999d7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
@@ -59,12 +59,12 @@ public class RecoveryLogsIterator
/**
* Scans the files in each recoveryLogDir over the range [start,end].
*/
- RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+ public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
LogFileKey end, boolean checkFirstKey) throws IOException {
List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
scanners = new ArrayList<>();
- Range range = LogFileKey.toRange(start, end);
+ Range range = start == null ? null : LogFileKey.toRange(start, end);
var vm = context.getVolumeManager();
for (Path logDir : recoveryLogDirs) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
index 5c0ce29..a984418 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
@@ -23,6 +23,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.DataInputStream;
import java.io.EOFException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
@@ -35,11 +36,13 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.start.spi.KeywordExecutable;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;
+import org.apache.accumulo.tserver.log.RecoveryLogsIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
@@ -72,7 +75,7 @@ public class LogReader implements KeywordExecutable {
}
/**
- * Dump a Log File (Map or Sequence) to stdout. Will read from HDFS or local file system.
+ * Dump a Log File to stdout. Will read from HDFS or local file system.
*
* @param args
* - first argument is the file to print
@@ -103,7 +106,8 @@ public class LogReader implements KeywordExecutable {
}
var siteConfig = SiteConfiguration.auto();
- try (var fs = VolumeManagerImpl.get(siteConfig, new Configuration())) {
+ ServerContext context = new ServerContext(siteConfig);
+ try (VolumeManager fs = VolumeManagerImpl.get(siteConfig, new Configuration())) {
Matcher rowMatcher = null;
KeyExtent ke = null;
@@ -123,13 +127,18 @@ public class LogReader implements KeywordExecutable {
Set<Integer> tabletIds = new HashSet<>();
for (String file : opts.files) {
-
Path path = new Path(file);
LogFileKey key = new LogFileKey();
LogFileValue value = new LogFileValue();
+ // ensure it's a regular non-sorted WAL file, and not a single sorted WAL in RFile format
if (fs.getFileStatus(path).isFile()) {
- // read log entries from a simple hdfs file
+ if (file.endsWith(".rf")) {
+ log.error("Unable to read from a single RFile. A non-sorted WAL file was expected. "
+ + "To read sorted WALs, please pass in a directory containing the sorted recovery logs.");
+ continue;
+ }
+
try (final FSDataInputStream fsinput = fs.open(path);
DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
while (true) {
@@ -146,10 +155,12 @@ public class LogReader implements KeywordExecutable {
continue;
}
} else {
- // read the log entries sorted in a map file
- try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
- while (input.hasNext()) {
- Entry<LogFileKey,LogFileValue> entry = input.next();
+ // read the log entries in a sorted RFile. This has to be a directory that contains the
+ // finished file.
+ try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), null,
+ null, false)) {
+ while (rli.hasNext()) {
+ Entry<LogFileKey,LogFileValue> entry = rli.next();
printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
opts.maxMutations);
}
@@ -200,9 +211,7 @@ public class LogReader implements KeywordExecutable {
}
}
-
System.out.println(key);
System.out.println(LogFileValue.format(value, maxMutations));
}
-
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
new file mode 100644
index 0000000..d6a013e
--- /dev/null
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
+public class RecoveryLogsIteratorTest {
+
+ private VolumeManager fs;
+ private File workDir;
+ static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
+ static ServerContext context;
+ static LogSorter logSorter;
+
+ @Rule
+ public TemporaryFolder tempFolder =
+ new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+ @Before
+ public void setUp() throws Exception {
+ context = createMock(ServerContext.class);
+ logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
+
+ workDir = tempFolder.newFolder();
+ String path = workDir.getAbsolutePath();
+ assertTrue(workDir.delete());
+ fs = VolumeManagerImpl.getLocalForTesting(path);
+ expect(context.getVolumeManager()).andReturn(fs).anyTimes();
+ expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+ .anyTimes();
+ expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+ replay(context);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ fs.close();
+ }
+
+ static class KeyValue implements Comparable<KeyValue> {
+ public final LogFileKey key;
+ public final LogFileValue value;
+
+ KeyValue() {
+ key = new LogFileKey();
+ value = new LogFileValue();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(key) + Objects.hashCode(value);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return this == obj || (obj instanceof KeyValue && 0 == compareTo((KeyValue) obj));
+ }
+
+ @Override
+ public int compareTo(KeyValue o) {
+ return key.compareTo(o.key);
+ }
+ }
+
+ @Test
+ public void testSimpleRLI() throws IOException {
+ KeyValue keyValue = new KeyValue();
+ keyValue.key.event = DEFINE_TABLET;
+ keyValue.key.seq = 0;
+ keyValue.key.tabletId = 1;
+ keyValue.key.tablet = extent;
+
+ KeyValue[] keyValues = {keyValue};
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+ logs.put("keyValues", keyValues);
+
+ ArrayList<Path> dirs = new ArrayList<>();
+
+ createRecoveryDir(logs, dirs, true);
+
+ try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) {
+ while (rli.hasNext()) {
+ Entry<LogFileKey,LogFileValue> entry = rli.next();
+ assertEquals("TabletId does not match", 1, entry.getKey().tabletId);
+ assertEquals("Event does not match", DEFINE_TABLET, entry.getKey().event);
+ }
+ }
+ }
+
+ @Test
+ public void testFinishMarker() throws IOException {
+ KeyValue keyValue = new KeyValue();
+ keyValue.key.event = DEFINE_TABLET;
+ keyValue.key.seq = 0;
+ keyValue.key.tabletId = 1;
+ keyValue.key.tablet = extent;
+
+ KeyValue[] keyValues = {keyValue};
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+ logs.put("keyValues", keyValues);
+
+ ArrayList<Path> dirs = new ArrayList<>();
+
+ createRecoveryDir(logs, dirs, false);
+
+ assertThrows("Finish marker should not be found", IOException.class,
+ () -> new RecoveryLogsIterator(context, dirs, null, null, false));
+ }
+
+ @Test
+ public void testSingleFile() throws IOException {
+ String destPath = workDir + "/test.rf";
+ fs.create(new Path(destPath));
+
+ assertThrows("Finish marker should not be found for a single file.", IOException.class,
+ () -> new RecoveryLogsIterator(context, Collections.singletonList(new Path(destPath)), null,
+ null, false));
+ }
+
+ @Test
+ public void testCheckFirstKeyFailed() throws IOException {
+ KeyValue keyValue = new KeyValue();
+ keyValue.key.event = DEFINE_TABLET;
+ keyValue.key.seq = 0;
+ keyValue.key.tabletId = 1;
+ keyValue.key.tablet = extent;
+
+ KeyValue[] keyValues = {keyValue};
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+ logs.put("keyValues", keyValues);
+
+ ArrayList<Path> dirs = new ArrayList<>();
+
+ createRecoveryDir(logs, dirs, true);
+
+ assertThrows("First log entry is not OPEN so exception should be thrown.",
+ IllegalStateException.class,
+ () -> new RecoveryLogsIterator(context, dirs, null, null, true));
+ }
+
+ @Test
+ public void testCheckFirstKeyPass() throws IOException {
+ KeyValue keyValue1 = new KeyValue();
+ keyValue1.key.event = OPEN;
+ keyValue1.key.seq = 0;
+ keyValue1.key.tabletId = -1;
+ keyValue1.key.tserverSession = "1";
+
+ KeyValue keyValue2 = new KeyValue();
+ keyValue2.key.event = DEFINE_TABLET;
+ keyValue2.key.seq = 0;
+ keyValue2.key.tabletId = 1;
+ keyValue2.key.tablet = extent;
+
+ KeyValue[] keyValues = {keyValue1, keyValue2};
+
+ Map<String,KeyValue[]> logs = new TreeMap<>();
+ logs.put("keyValues", keyValues);
+
+ ArrayList<Path> dirs = new ArrayList<>();
+
+ createRecoveryDir(logs, dirs, true);
+
+ try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, true)) {
+ while (rli.hasNext()) {
+ Entry<LogFileKey,LogFileValue> entry = rli.next();
+ assertNotNull(entry.getKey());
+ }
+ }
+ }
+
+ private void createRecoveryDir(Map<String,KeyValue[]> logs, ArrayList<Path> dirs,
+ boolean FinishMarker) throws IOException {
+
+ for (Entry<String,KeyValue[]> entry : logs.entrySet()) {
+ String destPath = workDir + "/dir";
+ FileSystem ns = fs.getFileSystemByPath(new Path(destPath));
+
+ // convert test object to Pairs for LogSorter.
+ List<Pair<LogFileKey,LogFileValue>> buffer = new ArrayList<>();
+ for (KeyValue pair : entry.getValue()) {
+ buffer.add(new Pair<>(pair.key, pair.value));
+ }
+ logSorter.writeBuffer(destPath, buffer, 0);
+
+ if (FinishMarker)
+ ns.create(SortedLogState.getFinishedMarkerPath(destPath));
+
+ dirs.add(new Path(destPath));
+ }
+ }
+}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
deleted file mode 100644
index 5a3a0af..0000000
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsReaderTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.accumulo.tserver.log;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.log.SortedLogState;
-import org.apache.accumulo.tserver.log.RecoveryLogReader.SortCheckIterator;
-import org.apache.accumulo.tserver.logger.LogEvents;
-import org.apache.accumulo.tserver.logger.LogFileKey;
-import org.apache.accumulo.tserver.logger.LogFileValue;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.MapFile.Writer;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
-public class RecoveryLogsReaderTest {
-
- private VolumeManager fs;
- private File workDir;
-
- @Rule
- public TemporaryFolder tempFolder =
- new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
-
- @Before
- public void setUp() throws Exception {
- workDir = tempFolder.newFolder();
- String path = workDir.getAbsolutePath();
- assertTrue(workDir.delete());
- fs = VolumeManagerImpl.getLocalForTesting(path);
- Path root = new Path("file://" + path);
- fs.mkdirs(root);
- fs.create(new Path(root, "finished")).close();
- FileSystem ns = fs.getFileSystemByPath(root);
-
- Writer oddWriter = new Writer(ns.getConf(), ns.makeQualified(new Path(root, "odd")),
- Writer.keyClass(IntWritable.class), Writer.valueClass(BytesWritable.class));
- BytesWritable value = new BytesWritable("someValue".getBytes());
- for (int i = 1; i < 1000; i += 2) {
- oddWriter.append(new IntWritable(i), value);
- }
- oddWriter.close();
-
- Writer evenWriter = new Writer(ns.getConf(), ns.makeQualified(new Path(root, "even")),
- Writer.keyClass(IntWritable.class), Writer.valueClass(BytesWritable.class));
- for (int i = 0; i < 1000; i += 2) {
- if (i == 10)
- continue;
- evenWriter.append(new IntWritable(i), value);
- }
- evenWriter.close();
- }
-
- @After
- public void tearDown() throws Exception {
- fs.close();
- }
-
- private void scan(RecoveryLogReader reader, int start) throws IOException {
- IntWritable key = new IntWritable();
- BytesWritable value = new BytesWritable();
-
- for (int i = start + 1; i < 1000; i++) {
- if (i == 10)
- continue;
- assertTrue(reader.next(key, value));
- assertEquals(i, key.get());
- }
- }
-
- private void scanOdd(RecoveryLogReader reader, int start) throws IOException {
- IntWritable key = new IntWritable();
- BytesWritable value = new BytesWritable();
-
- for (int i = start + 2; i < 1000; i += 2) {
- assertTrue(reader.next(key, value));
- assertEquals(i, key.get());
- }
- }
-
- @Test
- public void testMultiReader() throws IOException {
- Path manyMaps = new Path("file://" + workDir.getAbsolutePath());
- RecoveryLogReader reader = new RecoveryLogReader(fs, manyMaps);
- IntWritable key = new IntWritable();
- BytesWritable value = new BytesWritable();
-
- for (int i = 0; i < 1000; i++) {
- if (i == 10)
- continue;
- assertTrue(reader.next(key, value));
- assertEquals(i, key.get());
- }
- assertEquals(value.compareTo(new BytesWritable("someValue".getBytes())), 0);
- assertFalse(reader.next(key, value));
-
- key.set(500);
- assertTrue(reader.seek(key));
- scan(reader, 500);
- key.set(10);
- assertFalse(reader.seek(key));
- scan(reader, 10);
- key.set(1000);
- assertFalse(reader.seek(key));
- assertFalse(reader.next(key, value));
- key.set(-1);
- assertFalse(reader.seek(key));
- key.set(0);
- assertTrue(reader.next(key, value));
- assertEquals(0, key.get());
- reader.close();
-
- fs.deleteRecursively(new Path(manyMaps, "even"));
- reader = new RecoveryLogReader(fs, manyMaps);
- key.set(501);
- assertTrue(reader.seek(key));
- scanOdd(reader, 501);
- key.set(1000);
- assertFalse(reader.seek(key));
- assertFalse(reader.next(key, value));
- key.set(-1);
- assertFalse(reader.seek(key));
- key.set(1);
- assertTrue(reader.next(key, value));
- assertEquals(1, key.get());
- reader.close();
-
- }
-
- @Test(expected = IllegalStateException.class)
- public void testSortCheck() {
-
- List<Entry<LogFileKey,LogFileValue>> unsorted = new ArrayList<>();
-
- LogFileKey k1 = new LogFileKey();
- k1.event = LogEvents.MANY_MUTATIONS;
- k1.tabletId = 2;
- k1.seq = 55;
-
- LogFileKey k2 = new LogFileKey();
- k2.event = LogEvents.MANY_MUTATIONS;
- k2.tabletId = 9;
- k2.seq = 9;
-
- unsorted.add(new AbstractMap.SimpleEntry<>(k2, (LogFileValue) null));
- unsorted.add(new AbstractMap.SimpleEntry<>(k1, (LogFileValue) null));
-
- SortCheckIterator iter = new SortCheckIterator(unsorted.iterator());
-
- while (iter.hasNext()) {
- iter.next();
- }
- }
-
- /**
- * Test a failed marker doesn't cause issues. See Github issue
- * https://github.com/apache/accumulo/issues/961
- */
- @Test
- public void testFailed() throws Exception {
- Path manyMaps = new Path("file://" + workDir.getAbsolutePath());
- fs.create(new Path(manyMaps, SortedLogState.FAILED.getMarker())).close();
-
- RecoveryLogReader reader = new RecoveryLogReader(fs, manyMaps);
- IntWritable key = new IntWritable();
- BytesWritable value = new BytesWritable();
-
- for (int i = 0; i < 1000; i++) {
- if (i == 10)
- continue;
- assertTrue(reader.next(key, value));
- assertEquals(i, key.get());
- }
- reader.close();
-
- assertTrue(fs.delete(new Path(manyMaps, SortedLogState.FAILED.getMarker())));
- }
-
-}