You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/06/28 17:17:06 UTC

[GitHub] [accumulo] Manno15 opened a new pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Manno15 opened a new pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181


   Fixes #2172. 
   
   I wanted to get my progress on this issue out there as things are close to being finished. I still need to do more testing against the old-style WAL and to do more code clean-up. Especially in `RecoveryLogIterator` as there is some repeated code and I am not quite sure what is needed for the single file case. Beyond that, everything seems to be working as expected when using wal-info.
   
   Sample below of the output with wal-info, there appears to be a memory issue that I am taking a look at now. 
   <details>
   
   ```
   OPEN 458983ce-7008-4010-8ef1-8f785754fa40
   
   DEFINE_TABLET 1 1 +r<<
   
   DEFINE_TABLET 3 1 !0;~<
   
   DEFINE_TABLET 5 1 2<<
   
   COMPACTION_START 5 2 hdfs://groot:8020/accumulo/tables/2/default_tablet/F0000000.rf
   
   MANY_MUTATIONS 1 1
   2 mutations:
     !0;~
         suspend:loc [system]:1 [] <deleted>
         future:10000600c210004 [system]:1 [] groot:9997
     !0<
         suspend:loc [system]:2 [] <deleted>
         future:10000600c210004 [system]:2 [] groot:9997
   
   MUTATION 1 1
   2 mutations:
     !0<
         loc:10000600c210004 [system]:3 [] groot:9997
         future:10000600c210004 [system]:3 [] <deleted>
         suspend:loc [system]:3 [] <deleted>
     !0;~
         loc:10000600c210004 [system]:4 [] groot:9997
         future:10000600c210004 [system]:4 [] <deleted>
         suspend:loc [system]:4 [] <deleted>
   
   MUTATION 3 1
   6 mutations:
     1<
         ~tab:~pr [system]:1 [] 
         srv:dir [system]:1 [] default_tablet
         srv:time [system]:1 [] M0
         srv:lock [system]:1 [] managers/lock/zlock#acd63101-440c-4554-af2c-81d680e2897a#0000000000$10000600c21000b
     1<
         suspend:loc [system]:2 [] <deleted>
         future:10000600c210004 [system]:2 [] groot:9997
     1<
         loc:10000600c210004 [system]:3 [] groot:9997
         future:10000600c210004 [system]:3 [] <deleted>
         suspend:loc [system]:3 [] <deleted>
     2<
         ~tab:~pr [system]:4 [] 
         srv:dir [system]:4 [] default_tablet
         srv:time [system]:4 [] M0
         srv:lock [system]:4 [] managers/lock/zlock#acd63101-440c-4554-af2c-81d680e2897a#0000000000$10000600c21000b
     2<
         suspend:loc [system]:5 [] <deleted>
         future:10000600c210004 [system]:5 [] groot:9997
   ...
   #
   # java.lang.OutOfMemoryError: Java heap space
   # -XX:OnOutOfMemoryError="kill -9 %p"
   #   Executing /bin/sh -c "kill -9 41830"...
   [1]    41830 killed     accumulo wal-info 
   ```
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r664895009



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       Also, if a user specifies a single sorted WAL to read, it will error out (the second part of your issue mentions this). I will look into adding a check to prevent this though a more descriptive error message could suffice. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-872436542


   > java.lang.OutOfMemoryError: Java heap space
   >-XX:OnOutOfMemoryError="kill -9 %p"
   >   Executing /bin/sh -c "kill -9 41830"...
   >[1]    41830 killed     accumulo wal-info 
   
   For the OOM issue, should I adjust wal-info inside accumulo-env.sh or is something we should let the user decide on what heap space to give that particular utility? I tested upping the limit to 512m and did not run into the error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 merged pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 merged pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665331379



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       OK cool. If user passes in an rfile I think you could just create a singleton list to send as the param for `recoveryLogDirs`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665685677



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,12 +127,16 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
         if (fs.getFileStatus(path).isFile()) {
+          if (file.endsWith(".rf")) {

Review comment:
       I couldn't find one and I see other parts of the code check against the extension like I did. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665727262



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -35,11 +36,13 @@
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;

Review comment:
       That makes sense. I will keep in it this change and will add detail in the commit message once merged. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-872952860


   > > java.lang.OutOfMemoryError: Java heap space
   > > -XX:OnOutOfMemoryError="kill -9 %p"
   > > Executing /bin/sh -c "kill -9 41830"...
   > > [1]    41830 killed     accumulo wal-info
   > 
   > For the OOM issue, should I adjust heap space for wal-info inside accumulo-env.sh or is that something we should let the user decide on what heap space to give that particular utility? I tested upping the limit to 512m and did not run into the error.
   
   I think we should try and prevent the OOM error. It could be reading too much with the null range.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665691886



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,12 +127,16 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
         if (fs.getFileStatus(path).isFile()) {
+          if (file.endsWith(".rf")) {
+            log.error(
+                "Can not read from a single rfile. Please pass in a directory for recovery logs.");
+            continue;
+          }
           // read log entries from a simple hdfs file
           try (final FSDataInputStream fsinput = fs.open(path);
               DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {

Review comment:
       This area of the code is still used for wal-info to read in a single wal file (just tested to confirm). I will add additional comments because I agree, it is confusing. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 edited a comment on pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 edited a comment on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-878653866


   I added a unit test in my latest commit. I ultimately decided to keep it on the simple side since `SortedLogRecoveryTest` is a bit more complex and recovery utilizes RLI so I feel it is adequately tested there. I do however test the features of RLI, such as validateFirstKey. I also test the error cases, such as trying to read a single file and trying to iterate through a directory that doesn't contain the finished marker. 
   
   EDIT: Looking into the build error now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665710565



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -35,11 +36,13 @@
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;

Review comment:
       I went ahead and added it to this PR but I can remove it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-873027870


   > I think we should try and prevent the OOM error. It could be reading too much with the null range.
   
   I will investigate further but if I recall correctly from my testing, the OOM happens pretty early on in the operation so I am not quite sure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r669098470



##########
File path: server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
+public class RecoveryLogsIteratorTest {
+
+  private VolumeManager fs;
+  private File workDir;
+  static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
+  static ServerContext context;
+  static LogSorter logSorter;
+
+  @Rule
+  public TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Before
+  public void setUp() throws Exception {
+    context = createMock(ServerContext.class);
+    logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
+
+    workDir = tempFolder.newFolder();
+    String path = workDir.getAbsolutePath();
+    assertTrue(workDir.delete());
+    fs = VolumeManagerImpl.getLocalForTesting(path);
+    expect(context.getVolumeManager()).andReturn(fs).anyTimes();
+    expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+        .anyTimes();
+    expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+    replay(context);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.close();
+  }
+
+  static class KeyValue implements Comparable<KeyValue> {
+    public final LogFileKey key;
+    public final LogFileValue value;
+
+    KeyValue() {
+      key = new LogFileKey();
+      value = new LogFileValue();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(key) + Objects.hashCode(value);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this == obj || (obj instanceof KeyValue && 0 == compareTo((KeyValue) obj));
+    }
+
+    @Override
+    public int compareTo(KeyValue o) {
+      return key.compareTo(o.key);
+    }
+  }
+
+  @Test
+  public void testSimpleRLI() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, true);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) {
+      while (rli.hasNext()) {
+        Entry<LogFileKey,LogFileValue> entry = rli.next();
+        assertEquals("TabletId does not match", 1, entry.getKey().tabletId);
+        assertEquals("Event does not match", DEFINE_TABLET, entry.getKey().event);
+      }
+    }
+  }
+
+  @Test
+  public void testFinishMarker() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, false);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) {
+      while (rli.hasNext()) {
+        fail("Finish marker should not be found. Exception should have been thrown.");
+      }
+    } catch (IOException e) {
+      // Expected exception
+    }
+  }

Review comment:
       Made this change in me latest commit. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665682442



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -35,11 +36,13 @@
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;

Review comment:
       That was deleted on my end. Interesting. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-871625604


   After some discussion, the goals of this PR shifted away from keeping backwards compatibility. It has also moved to require the user pass in the directory rather than allow specific files to be passed in. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665683334



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -56,15 +56,20 @@
   private final List<Scanner> scanners;
   private final Iterator<Entry<Key,Value>> iter;
 
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs,
+      boolean checkFirstKEy) throws IOException {
+    this(context, recoveryLogDirs, null, null, checkFirstKEy);
+  }

Review comment:
       It is probably best to delete it and use passed in null here. The only reason I added it was to keep it consistent with how `RecoveryLogReader` did a rangeless read. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-873093928


   For reference, here is the full output when heap size is increased:
   
   <details>
   
   ```
   OPEN 54a8593a-310b-48a9-933c-d97ec372a7eb
   
   DEFINE_TABLET 1 1 +r<<
   
   DEFINE_TABLET 1 3 +r<<
   
   DEFINE_TABLET 1 5 +r<<
   
   DEFINE_TABLET 3 1 !0;~<
   
   DEFINE_TABLET 4 1 1<<
   
   DEFINE_TABLET 5 1 2<<
   
   COMPACTION_START 1 2 hdfs://groot:8020/accumulo/tables/+r/root_tablet/F0000002.rf
   
   COMPACTION_FINISH 1 3
   
   COMPACTION_START 1 4 hdfs://groot:8020/accumulo/tables/+r/root_tablet/F0000006.rf
   
   COMPACTION_FINISH 1 5
   
   COMPACTION_START 3 2 hdfs://groot:8020/accumulo/tables/!0/table_info/F0000001.rf
   
   COMPACTION_FINISH 3 3
   
   COMPACTION_START 3 4 hdfs://groot:8020/accumulo/tables/!0/table_info/F0000005.rf
   
   COMPACTION_FINISH 3 5
   
   COMPACTION_START 5 2 hdfs://groot:8020/accumulo/tables/2/default_tablet/F0000000.rf
   
   COMPACTION_FINISH 5 3
   
   COMPACTION_START 5 4 hdfs://groot:8020/accumulo/tables/2/default_tablet/F0000003.rf
   
   COMPACTION_FINISH 5 5
   
   COMPACTION_START 5 6 hdfs://groot:8020/accumulo/tables/2/default_tablet/F0000004.rf
   
   COMPACTION_FINISH 5 7
   
   MANY_MUTATIONS 1 1
   2 mutations:
     !0;~
         suspend:loc [system]:1 [] <deleted>
         future:1000051b34d0004 [system]:1 [] groot:9997
     !0<
         suspend:loc [system]:2 [] <deleted>
         future:1000051b34d0004 [system]:2 [] groot:9997
   
   MUTATION 1 1
   2 mutations:
     !0<
         loc:1000051b34d0004 [system]:3 [] groot:9997
         future:1000051b34d0004 [system]:3 [] <deleted>
         suspend:loc [system]:3 [] <deleted>
     !0;~
         loc:1000051b34d0004 [system]:4 [] groot:9997
         future:1000051b34d0004 [system]:4 [] <deleted>
         suspend:loc [system]:4 [] <deleted>
   
   MUTATION 1 3
   1 mutations:
     !0;~
         file:hdfs://groot:8020/accumulo/tables/!0/table_info/F0000001.rf [system]:5 [] 590,14
         srv:time [system]:5 [] L6
         last:1000051b34d0004 [system]:5 [] groot:9997
         srv:flush [system]:5 [] 0
         srv:lock [system]:5 [] tservers/groot:9997/zlock#fe1ada18-f3ef-415f-b60a-592e5e8d5563#0000000000$1000051b34d0004
   
   MUTATION 1 5
   1 mutations:
     !0;~
         file:hdfs://groot:8020/accumulo/tables/!0/table_info/F0000005.rf [system]:6 [] 601,6
         srv:time [system]:6 [] L8
         last:1000051b34d0004 [system]:6 [] groot:9997
         srv:flush [system]:6 [] 0
         srv:lock [system]:6 [] tservers/groot:9997/zlock#fe1ada18-f3ef-415f-b60a-592e5e8d5563#0000000000$1000051b34d0004
   
   MUTATION 3 1
   6 mutations:
     1<
         ~tab:~pr [system]:1 [] 
         srv:dir [system]:1 [] default_tablet
         srv:time [system]:1 [] M0
         srv:lock [system]:1 [] managers/lock/zlock#e2c30677-5926-4615-9df1-b0a667569b00#0000000000$1000051b34d0009
     1<
         suspend:loc [system]:2 [] <deleted>
         future:1000051b34d0004 [system]:2 [] groot:9997
     1<
         loc:1000051b34d0004 [system]:3 [] groot:9997
         future:1000051b34d0004 [system]:3 [] <deleted>
         suspend:loc [system]:3 [] <deleted>
     2<
         ~tab:~pr [system]:4 [] 
         srv:dir [system]:4 [] default_tablet
         srv:time [system]:4 [] M0
         srv:lock [system]:4 [] managers/lock/zlock#e2c30677-5926-4615-9df1-b0a667569b00#0000000000$1000051b34d0009
     2<
         suspend:loc [system]:5 [] <deleted>
         future:1000051b34d0004 [system]:5 [] groot:9997
   ...
   MUTATION 3 3
   2 mutations:
     2<
         file:hdfs://groot:8020/accumulo/tables/2/default_tablet/F0000000.rf [system]:7 [] 10285525,395990
         srv:time [system]:7 [] M1625236693271
         last:1000051b34d0004 [system]:7 [] groot:9997
         srv:flush [system]:7 [] 0
         srv:lock [system]:7 [] tservers/groot:9997/zlock#fe1ada18-f3ef-415f-b60a-592e5e8d5563#0000000000$1000051b34d0004
     2<
         file:hdfs://groot:8020/accumulo/tables/2/default_tablet/F0000003.rf [system]:8 [] 15817470,604010
         srv:time [system]:8 [] M1625236697022
         last:1000051b34d0004 [system]:8 [] groot:9997
         srv:flush [system]:8 [] 0
         srv:lock [system]:8 [] tservers/groot:9997/zlock#fe1ada18-f3ef-415f-b60a-592e5e8d5563#0000000000$1000051b34d0004
   
   MUTATION 3 5
   1 mutations:
     2<
         file:hdfs://groot:8020/accumulo/tables/2/default_tablet/F0000004.rf [system]:9 [] 16423100,453276
         srv:time [system]:9 [] M1625236700134
         last:1000051b34d0004 [system]:9 [] groot:9997
         srv:flush [system]:9 [] 0
         srv:lock [system]:9 [] tservers/groot:9997/zlock#fe1ada18-f3ef-415f-b60a-592e5e8d5563#0000000000$1000051b34d0004
   
   MANY_MUTATIONS 4 1
   2 mutations:
     563864db09fcf1ed
         span:[879f12653b2754eb]:a5c902a56d5643c5 [system]:1625236703647 [] 	localhosttserver���ᶲ����ͪ�����&����^�����^start
                                                                                                                                 ��������
     idx:tserver:17a67a7fcf9
         start:localhost [system]:1625236703647 [] 563864db09fcf1ed:105
   
   MANY_MUTATIONS 5 1
   395990 mutations:
     00001e46cd9dbb6d
         33f1:6f7b [system]:1625236692263 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:000000000000f30f::60ef982f
     0002ef551d05b9ef
         5e53:4d00 [system]:1625236692263 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000008367::a17a26cd
     0003614899fb7e09
         64b3:2f64 [system]:1625236692263 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000011be2::e068d7a2
     0003ae1440ced624
         2394:1b9d [system]:1625236692263 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:000000000000d306::8ac2c2fb
     0003ae69a3481887
         4810:7226 [system]:1625236692263 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:000000000000b211::85b1994f
   ...
   MANY_MUTATIONS 5 3
   208020 mutations:
     32464fcaeb797b43
         7cf2:0959 [system]:1625236695399 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000c15ac::2fdc324f
     0b82924a76aad40f
         68a1:0e80 [system]:1625236695399 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000c15ad::a68e8831
     0f29c7e66a8c6606
         6753:6abc [system]:1625236695399 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000c15ae::5048f32a
     5e434005727b80c6
         1021:4341 [system]:1625236695399 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000c15af::54571bd4
     51762841b00ece87
         240f:457c [system]:1625236695399 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000c15b0::95dc1793
   ...
   MANY_MUTATIONS 5 3
   395990 mutations:
     29f338c7360f1d7b
         7fbc:00df [system]:1625236693566 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000060ad6::c4f27f03
     354c04e63944a06e
         21d7:51c6 [system]:1625236693566 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000060ad7::40341a31
     5ca1b0f76f317baa
         4434:182d [system]:1625236693566 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000060ad8::63334983
     7a64b29ae63ed4b7
         18f3:295b [system]:1625236693566 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000060ad9::96038494
     7ff576d10fa3d1bd
         34ba:51ee [system]:1625236693566 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000060ada::027dd2a8
   ...
   MANY_MUTATIONS 5 5
   453276 mutations:
     4921f22f02b58d14
         3468:5685 [system]:1625236697323 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000f4240:46545744488120eb:b1bc14a8
     0679069f15cd67fb
         0638:724b [system]:1625236697323 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000f4241:65bff8ad2b27763a:f25897f9
     44529c6226d3950d
         6495:14ae [system]:1625236697323 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000f4242:4e0a5c7eb0b27e7e:089c33ed
     637acdf343c0b9d9
         38e8:6944 [system]:1625236697323 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000f4243:02df37a5a45040f8:fcd2cac2
     2aa310518bcca498
         3456:0b91 [system]:1625236697323 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000000f4244:3158d0931f445890:29deea07
   ...
   MANY_MUTATIONS 5 7
   377730 mutations:
     04c44e1aabc3dc4e
         22ce:47de [system]:1625236700973 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000001753f6:252071511bfa33c9:b2d2068a
     435ec0daef4a2566
         3425:6c1a [system]:1625236700973 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000001753f7:2df4ee72bf6a996b:87ea4301
     278087d17016d2f1
         0a4d:3622 [system]:1625236700973 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000001753f8:64c72d82f7d4846b:00931bdb
     58acc86753d592a0
         2e76:020f [system]:1625236700973 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000001753f9:57ac1858c4b0e62a:d81c54e1
     041cd3e3468f6d43
         2de7:51e0 [system]:1625236700973 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:00000000001753fa:6eb09bace7089627:1f3abf52
   ...
   MANY_MUTATIONS 5 7
   75546 mutations:
     339356b860d4a8a6
         1689:1937 [system]:1625236700436 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000162cdc:5c50fd0732b857cb:ad4e580e
     4d1d70c2a055aadb
         69f7:2a5e [system]:1625236700436 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000162cdd:479b556f4f7233fc:9df3bd39
     2292ff1fcb744e31
         1508:71d6 [system]:1625236700436 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000162cde:61ec957f4a4df758:cae1fbd5
     1746f7c0307593e1
         1c89:35c9 [system]:1625236700436 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000162cdf:34103199d6b718ca:5cc485df
     31c48b8ac80bd581
         51c3:27ba [system]:1625236700436 [] 4f2d016e-77dd-4422-ab3c-012bf81aff68:0000000000162ce0:43bd5015f4da1296:ffa29c69
   ...
   ```
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r660776436



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,33 +130,44 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
+        // read old style WALs
+        if (containsMapFile(fs, path)) {

Review comment:
       I think this check should happen in the else below for the temporary sorted WALs. The first `isFile()` check takes care of reading from the main WALs (the files in /accumulo/wal), which hasn't changed. I think the else below should read the files in /accumulo/recovery. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665700422



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,12 +127,16 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
         if (fs.getFileStatus(path).isFile()) {
+          if (file.endsWith(".rf")) {
+            log.error(
+                "Can not read from a single rfile. Please pass in a directory for recovery logs.");
+            continue;
+          }
           // read log entries from a simple hdfs file
           try (final FSDataInputStream fsinput = fs.open(path);
               DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {

Review comment:
       I moved the comment up and tried to be more explicit in my latest commit. Let me know if you think I should add more information to it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r660799976



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,33 +130,44 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
+        // read old style WALs
+        if (containsMapFile(fs, path)) {

Review comment:
       That's how it was, but I ran into issues while trying to read the RFile with accumulo wal-info. It was simpler to figure out which version to use before figuring out if the user passed in a file or a directory. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 edited a comment on pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 edited a comment on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-878653866


   I added a unit test in my latest commit. I ultimately decided to keep it on the simple side since `SortedLogRecoveryTest` is a bit more complex and recovery utilizes RLI so I feel it is adequately tested there. I do however test the features of RLI, such as validateFirstKey. I also test the error cases, such as trying to read a single file and trying to iterate through a directory that doesn't contain the finished marker. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 edited a comment on pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 edited a comment on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-872436542


   > java.lang.OutOfMemoryError: Java heap space
   >-XX:OnOutOfMemoryError="kill -9 %p"
   >   Executing /bin/sh -c "kill -9 41830"...
   >[1]    41830 killed     accumulo wal-info 
   
   For the OOM issue, should I adjust heap space for wal-info inside accumulo-env.sh or is that something we should let the user decide on what heap space to give that particular utility? I tested upping the limit to 512m and did not run into the error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r664889791



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       My mistake, I thought we previously discussed removing it. I will add it back. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665638698



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -56,15 +56,20 @@
   private final List<Scanner> scanners;
   private final Iterator<Entry<Key,Value>> iter;
 
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs,
+      boolean checkFirstKEy) throws IOException {
+    this(context, recoveryLogDirs, null, null, checkFirstKEy);
+  }
+
   /**
    * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
       LogFileKey end, boolean checkFirstKey) throws IOException {
 
     List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
     scanners = new ArrayList<>();
-    Range range = LogFileKey.toRange(start, end);
+    Range range = start != null ? LogFileKey.toRange(start, end) : null;

Review comment:
       This is more readable without the negations:
   
   ```suggestion
       Range range = start == null ? null : LogFileKey.toRange(start, end);
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -35,11 +36,13 @@
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;

Review comment:
       Is the old RecoveryLogReader not used after this? Can it be deleted?

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -56,15 +56,20 @@
   private final List<Scanner> scanners;
   private final Iterator<Entry<Key,Value>> iter;
 
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs,
+      boolean checkFirstKEy) throws IOException {
+    this(context, recoveryLogDirs, null, null, checkFirstKEy);
+  }

Review comment:
       It seems like this new constructor was added for convenience, but it doesn't have a javadoc to explain how it differs from the other constructor. If your goal is to make it crystal clear when you would use these additional parameters and when you don't need them, instead of using constructors here you could use static methods whose names are descriptive in the API.
   
   So, instead of the following in the calling code:
   
   ```java
   var iter = new RecoveryLogsIterator(context, dirs, start, stop, false);
   var iter2 = new RecoveryLogsIterator(context, dirs, true);
   ```
   
   it could look like:
   
   ```java
   var iter = RecoveryLogsIterator.scanRange(context, dirs, start, stop, false);
   var iter2 = RecoveryLogsIterator.scanAll(context, dirs); // don't think you need the boolean for this case
   ```
   
   Then, you can make the constructor private (or package-private, if you need it visible for testing), and the implementing static methods can do some additional parameter validation to ensure that when you specify a range, both arguments aren't null.
   
   These are just general design tips, though. For this, since the new constructor is only ever used once, I'd probably just remove it and pass in nulls on the old constructor for the one time it is called. Minimally, if you're keeping the new constructor, it should have a javadoc (and you should double check to see if you actually need to pass the boolean, since I don't think it matters when the first key is null).

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,12 +127,16 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
         if (fs.getFileStatus(path).isFile()) {
+          if (file.endsWith(".rf")) {
+            log.error(
+                "Can not read from a single rfile. Please pass in a directory for recovery logs.");
+            continue;
+          }
           // read log entries from a simple hdfs file
           try (final FSDataInputStream fsinput = fs.open(path);
               DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {

Review comment:
       Given this error message and new behavior, it's not clear what this condition (single file, but not ending in ".rf") is supposed to do. Is this for reading the older sequence files? A comment could go a long way here. The only comment it has is "read log entries from a simple hdfs file". Clearly, that can't be read from a simple RFile in HDFS, so that comment could be updated to make it clear what files it *can* read (i.e. what does "simple hdfs file" mean?).

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,12 +127,16 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
         if (fs.getFileStatus(path).isFile()) {
+          if (file.endsWith(".rf")) {

Review comment:
       There's probably a constant for the RFile extension somewhere that could be used... not that it would change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665733606



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,13 +127,18 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
+        // read log entries from a single WAL file.
         if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
+          if (file.endsWith(".rf")) {
+            log.error(
+                "Can not read from a single rfile. Please pass in a directory for recovery logs.");

Review comment:
       ```suggestion
                   "Unable to read from a single RFile. A non-sorted WAL file was expected. To read sorted WALs, please pass in a directory containing the sorted recovery logs.");
   ```

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,12 +127,16 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
         if (fs.getFileStatus(path).isFile()) {
+          if (file.endsWith(".rf")) {

Review comment:
       Based on the updated comments, I can see that the first case is for reading regular non-sorted WAL files, and not the sorted ones, whereas the `else` block is for reading sorted WAL files from a single directory. I think that could be made even more clear:
   
   ```suggestion
             // make sure it's a regular non-sorted WAL file, and not a single sorted WAL in RFile format
             if (file.endsWith(".rf")) {
   ```
   
   Thinking about this a bit, it's a little weird that we can't read a single sorted WAL file. We can read a directory containing one, but we can't read it by itself? That doesn't seem to be a useful restriction for this utility, which is primarily intended to be a troubleshooting utility. If we pass in a single RFile, I feel like this tool should still be able to read it. That could be done here or in a separate PR, unless there's a reason to not support that at all.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-878653866


   I added a unit test in my latest commit. I ultimately decided to keep it on the simple side since `SortedLogRecoveryTest` is a bit more complex and recovery utilizes RLI so I feel it is adequately tested there. I do however test the features of RLI, such as validateFirstKey. I also test the error cases, such as trying to read a single file and trying to iterate through a directory that doesn't contain the finished marker. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665723342



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,12 +127,16 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
         LogFileKey key = new LogFileKey();
         LogFileValue value = new LogFileValue();
 
         if (fs.getFileStatus(path).isFile()) {
+          if (file.endsWith(".rf")) {

Review comment:
       It's fine. It's not a problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r660768352



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -56,42 +56,57 @@
   private final List<Scanner> scanners;
   private final Iterator<Entry<Key,Value>> iter;
 
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs,
+      boolean checkFirstKEy) throws IOException {
+    this(context, recoveryLogDirs, null, null, checkFirstKEy);
+  }
+
   /**
    * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
       LogFileKey end, boolean checkFirstKey) throws IOException {
 
     List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
     scanners = new ArrayList<>();
-    Range range = LogFileKey.toRange(start, end);
+    Range range = start != null ? LogFileKey.toRange(start, end) : null;
     var vm = context.getVolumeManager();
 
     for (Path logDir : recoveryLogDirs) {
       LOG.debug("Opening recovery log dir {}", logDir.getName());
-      List<Path> logFiles = getFiles(vm, logDir);
       var fs = vm.getFileSystemByPath(logDir);
 
-      // only check the first key once to prevent extra iterator creation and seeking
-      if (checkFirstKey) {
-        validateFirstKey(context, fs, logFiles, logDir);
-      }
-
-      for (Path log : logFiles) {
-        var scanner = RFile.newScanner().from(log.toString()).withFileSystem(fs)
-            .withTableProperties(context.getConfiguration()).build();
-
-        scanner.setRange(range);
+      // if path passed in is actually a file, read a single RFile
+      if (vm.getFileStatus(logDir).isFile()) {

Review comment:
       This should be in LogReader. I don't think we want this check in `RecoveryLogsIterator` since it is used in very specific ways in the recovery code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r669076996



##########
File path: server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
+public class RecoveryLogsIteratorTest {
+
+  private VolumeManager fs;
+  private File workDir;
+  static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
+  static ServerContext context;
+  static LogSorter logSorter;
+
+  @Rule
+  public TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Before
+  public void setUp() throws Exception {
+    context = createMock(ServerContext.class);
+    logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
+
+    workDir = tempFolder.newFolder();
+    String path = workDir.getAbsolutePath();
+    assertTrue(workDir.delete());
+    fs = VolumeManagerImpl.getLocalForTesting(path);
+    expect(context.getVolumeManager()).andReturn(fs).anyTimes();
+    expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+        .anyTimes();
+    expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+    replay(context);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.close();
+  }
+
+  static class KeyValue implements Comparable<KeyValue> {
+    public final LogFileKey key;
+    public final LogFileValue value;
+
+    KeyValue() {
+      key = new LogFileKey();
+      value = new LogFileValue();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(key) + Objects.hashCode(value);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this == obj || (obj instanceof KeyValue && 0 == compareTo((KeyValue) obj));
+    }
+
+    @Override
+    public int compareTo(KeyValue o) {
+      return key.compareTo(o.key);
+    }
+  }
+
+  @Test
+  public void testSimpleRLI() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, true);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) {
+      while (rli.hasNext()) {
+        Entry<LogFileKey,LogFileValue> entry = rli.next();
+        assertEquals("TabletId does not match", 1, entry.getKey().tabletId);
+        assertEquals("Event does not match", DEFINE_TABLET, entry.getKey().event);
+      }
+    }
+  }
+
+  @Test
+  public void testFinishMarker() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, false);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) {
+      while (rli.hasNext()) {
+        fail("Finish marker should not be found. Exception should have been thrown.");
+      }
+    } catch (IOException e) {
+      // Expected exception
+    }
+  }

Review comment:
       It may be possible to use assertThrows here, something like:
   ```suggestion
       assertThrows("Finish marker should not be found", IOException.class, () -> new RecoveryLogsIterator(context, dirs, null, null, false));
     }
   ```
   This code would be slightly different if the expected exception is thrown in the `hasNext()` method instead of in the constructor, but the idea is the same.
   
   (same comment for the other occurrences of `fail()` and `try-catch` to handle expected exceptions elsewhere in this class)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665699564



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -35,11 +36,13 @@
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;

Review comment:
       It does appear that RecoveryLogReader is only used in its test now. Should I remove it in this PR or a new one? 
   
   (Sorry on the original comment to this, I misunderstood what you meant)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r669104267



##########
File path: server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
##########
@@ -161,24 +161,20 @@ public void testFinishMarker() throws IOException {
 
     createRecoveryDir(logs, dirs, false);
 
-    assertThrows("Finish marker should not be found", IOException.class, () -> new RecoveryLogsIterator(context, dirs, null, null, false));
+    assertThrows("Finish marker should not be found", IOException.class,
+        () -> new RecoveryLogsIterator(context, dirs, null, null, false));
   }
 
   @Test
   public void testSingleFile() throws IOException {
     String destPath = workDir + "/test.rf";
     fs.create(new Path(destPath));
 
-    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context,
-        Collections.singletonList(new Path(destPath)), null, null, false)) {
-      while (rli.hasNext()) {
-        fail(
-            "Finish marker should not be found for a single file. Exception should have been thrown.");
-      }
-    } catch (IOException e) {
-      fs.delete(new Path(destPath));
-      // Expected exception
-    }
+    assertThrows("Finish marker should not be found for a single file.", IOException.class,
+        () -> new RecoveryLogsIterator(context, Collections.singletonList(new Path(destPath)), null,
+            null, false));
+
+    fs.delete(new Path(destPath));

Review comment:
       You don't even need to delete the file, since the folder is temporary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r664895009



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       Also, if a user specifies a single sorted WAL to read, it will error out (the second part of your issue mentions this). I will look into adding a check to prevent this, though a more descriptive error message could suffice. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665409143



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       Gotcha. I think making the user pass in the directory is fine. It should print the error if it doesn't have the finished file, correct? I think reading a partially sorted directory is undesirable so that should work.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
milleruntime commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-876367995


   > Thinking about this a bit, it's a little weird that we can't read a single sorted WAL file. We can read a directory containing one, but we can't read it by itself? That doesn't seem to be a useful restriction for this utility, which is primarily intended to be a troubleshooting utility. 
   
   This is a restriction of the underlying code, not the utility. The `RecoveryLogsIterator` validates the directory while getting the sorted files to read. This is done here: https://github.com/apache/accumulo/blob/3e765b1f3b487092206cee036097820c37b7907e/server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java#L125-L146
   
   > If we pass in a single RFile, I feel like this tool should still be able to read it. That could be done here or in a separate PR, unless there's a reason to not support that at all.
   
   Reading a sorted wal RFile that has not completed the sort has limited utility. It will take some refactoring of critical code in the complicated recovery process that may not be worth the trouble. An admin can still use `accumulo rfile-info` on these files to get information about them or read the original WAL to get the whole picture.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r664895009



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       Also, if a user specifies a single sorted WAL to read, it will error out (the second part of your issue mentions this). I will look into adding a check to prevent this (something like if file.endsWith(",rf"), though a more descriptive error message could suffice. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r660769786



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/RecoveryLogsIterator.java
##########
@@ -56,42 +56,57 @@
   private final List<Scanner> scanners;
   private final Iterator<Entry<Key,Value>> iter;
 
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs,
+      boolean checkFirstKEy) throws IOException {
+    this(context, recoveryLogDirs, null, null, checkFirstKEy);
+  }
+
   /**
    * Scans the files in each recoveryLogDir over the range [start,end].
    */
-  RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
+  public RecoveryLogsIterator(ServerContext context, List<Path> recoveryLogDirs, LogFileKey start,
       LogFileKey end, boolean checkFirstKey) throws IOException {
 
     List<Iterator<Entry<Key,Value>>> iterators = new ArrayList<>(recoveryLogDirs.size());
     scanners = new ArrayList<>();
-    Range range = LogFileKey.toRange(start, end);
+    Range range = start != null ? LogFileKey.toRange(start, end) : null;
     var vm = context.getVolumeManager();
 
     for (Path logDir : recoveryLogDirs) {
       LOG.debug("Opening recovery log dir {}", logDir.getName());
-      List<Path> logFiles = getFiles(vm, logDir);
       var fs = vm.getFileSystemByPath(logDir);
 
-      // only check the first key once to prevent extra iterator creation and seeking
-      if (checkFirstKey) {
-        validateFirstKey(context, fs, logFiles, logDir);
-      }
-
-      for (Path log : logFiles) {
-        var scanner = RFile.newScanner().from(log.toString()).withFileSystem(fs)
-            .withTableProperties(context.getConfiguration()).build();
-
-        scanner.setRange(range);
+      // if path passed in is actually a file, read a single RFile
+      if (vm.getFileStatus(logDir).isFile()) {

Review comment:
       I will look for alternatives. Have to work around a few inconsistencies with the old stuff and new. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on pull request #2181: WIP: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-873046612


   @milleruntime I confirmed in my testing that the OOM will still happen even with a set range. It really depends on how much information/LogEvents we want to show but to be safe, I think it would be best to have the default heap size higher.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665359721



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       A single rfile will not work with RLI currently. It doesn't contain the finish marker which is looked for in rli.getFiles (that seems to be the only issue with it not working). Also, since LogReader usually deals with one directory/file at a time since it's looped through what the user passes in, I already am using a singleton list to read in that path. I could change the passed in files (opts.files in LogReader) to be a List<path> but it doesn't seem necessary. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665422163



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       My latest commit adds an extra error message if it detects a file but the file ends with .rf. Otherwise, yes, it will error out if it gets past that but doesn't contain the finished file.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665682442



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -35,11 +36,13 @@
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;

Review comment:
       That was deleted on my end. Interesting. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#issuecomment-876443472


   I will work on adding the unit test to this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] milleruntime commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
milleruntime commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r664857196



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       This looks good for reading the sorted logs but we still want the section under `if (fs.getFileStatus(path).isFile())` for reading regular write ahead logs using `DfsLogger`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] Manno15 commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
Manno15 commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665359721



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -123,36 +122,17 @@ public void execute(String[] args) throws Exception {
       Set<Integer> tabletIds = new HashSet<>();
 
       for (String file : opts.files) {
-
         Path path = new Path(file);
-        LogFileKey key = new LogFileKey();
-        LogFileValue value = new LogFileValue();
-
-        if (fs.getFileStatus(path).isFile()) {
-          // read log entries from a simple hdfs file
-          try (final FSDataInputStream fsinput = fs.open(path);
-              DataInputStream input = DfsLogger.getDecryptingStream(fsinput, siteConfig)) {
-            while (true) {
-              try {
-                key.readFields(input);
-                value.readFields(input);
-              } catch (EOFException ex) {
-                break;
-              }
-              printLogEvent(key, value, row, rowMatcher, ke, tabletIds, opts.maxMutations);
-            }
-          } catch (LogHeaderIncompleteException e) {
-            log.warn("Could not read header for {} . Ignoring...", path);
-            continue;
-          }
-        } else {
-          // read the log entries sorted in a map file
-          try (RecoveryLogReader input = new RecoveryLogReader(fs, path)) {
-            while (input.hasNext()) {
-              Entry<LogFileKey,LogFileValue> entry = input.next();
-              printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
-                  opts.maxMutations);
-            }
+        if (!fs.getFileStatus(path).isDirectory()) {
+          log.error("No directory was given. Please pass in a recovery directory");
+          continue;
+        }
+        // read the log entries sorted in a RFile
+        try (var rli = new RecoveryLogsIterator(context, Collections.singletonList(path), true)) {
+          while (rli.hasNext()) {
+            Entry<LogFileKey,LogFileValue> entry = rli.next();
+            printLogEvent(entry.getKey(), entry.getValue(), row, rowMatcher, ke, tabletIds,
+                opts.maxMutations);

Review comment:
       A single rfile will not work with RLI currently. It doesn't contain the finish marker which is looked for in rli.getFiles (that seems to be the only issue with it not working). Also, since LogReader usually deals with one directory/file at a time since it's looped through what the user passes in, I already am using a singleton list to read in that path. I could change the passed in files (opts.files in LogReader) to be a List \<path> but it doesn't seem necessary. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r669076996



##########
File path: server/tserver/src/test/java/org/apache/accumulo/tserver/log/RecoveryLogsIteratorTest.java
##########
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static org.apache.accumulo.tserver.logger.LogEvents.DEFINE_TABLET;
+import static org.apache.accumulo.tserver.logger.LogEvents.OPEN;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.crypto.CryptoServiceFactory;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.tserver.logger.LogFileKey;
+import org.apache.accumulo.tserver.logger.LogFileValue;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
+@SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "paths not set by user input")
+public class RecoveryLogsIteratorTest {
+
+  private VolumeManager fs;
+  private File workDir;
+  static final KeyExtent extent = new KeyExtent(TableId.of("table"), null, null);
+  static ServerContext context;
+  static LogSorter logSorter;
+
+  @Rule
+  public TemporaryFolder tempFolder =
+      new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
+
+  @Before
+  public void setUp() throws Exception {
+    context = createMock(ServerContext.class);
+    logSorter = new LogSorter(context, DefaultConfiguration.getInstance());
+
+    workDir = tempFolder.newFolder();
+    String path = workDir.getAbsolutePath();
+    assertTrue(workDir.delete());
+    fs = VolumeManagerImpl.getLocalForTesting(path);
+    expect(context.getVolumeManager()).andReturn(fs).anyTimes();
+    expect(context.getCryptoService()).andReturn(CryptoServiceFactory.newDefaultInstance())
+        .anyTimes();
+    expect(context.getConfiguration()).andReturn(DefaultConfiguration.getInstance()).anyTimes();
+    replay(context);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    fs.close();
+  }
+
+  static class KeyValue implements Comparable<KeyValue> {
+    public final LogFileKey key;
+    public final LogFileValue value;
+
+    KeyValue() {
+      key = new LogFileKey();
+      value = new LogFileValue();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hashCode(key) + Objects.hashCode(value);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return this == obj || (obj instanceof KeyValue && 0 == compareTo((KeyValue) obj));
+    }
+
+    @Override
+    public int compareTo(KeyValue o) {
+      return key.compareTo(o.key);
+    }
+  }
+
+  @Test
+  public void testSimpleRLI() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, true);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) {
+      while (rli.hasNext()) {
+        Entry<LogFileKey,LogFileValue> entry = rli.next();
+        assertEquals("TabletId does not match", 1, entry.getKey().tabletId);
+        assertEquals("Event does not match", DEFINE_TABLET, entry.getKey().event);
+      }
+    }
+  }
+
+  @Test
+  public void testFinishMarker() throws IOException {
+    KeyValue keyValue = new KeyValue();
+    keyValue.key.event = DEFINE_TABLET;
+    keyValue.key.seq = 0;
+    keyValue.key.tabletId = 1;
+    keyValue.key.tablet = extent;
+
+    KeyValue[] keyValues = {keyValue};
+
+    Map<String,KeyValue[]> logs = new TreeMap<>();
+    logs.put("keyValues", keyValues);
+
+    ArrayList<Path> dirs = new ArrayList<>();
+
+    createRecoveryDir(logs, dirs, false);
+
+    try (RecoveryLogsIterator rli = new RecoveryLogsIterator(context, dirs, null, null, false)) {
+      while (rli.hasNext()) {
+        fail("Finish marker should not be found. Exception should have been thrown.");
+      }
+    } catch (IOException e) {
+      // Expected exception
+    }
+  }

Review comment:
       It may be possible to use assertThrows here, something like:
   ```suggestion
       assertThrows("Finish marker should not be found", IOException.class, () -> new RecoveryLogsIterator(context, dirs, null, null, false));
     }
   ```
   This code would be slightly different if the expected exception is thrown in the `hasNext()` method instead of in the constructor, but the idea is the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on a change in pull request #2181: Update LogReader to utilize RecoveryLogsIterator

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on a change in pull request #2181:
URL: https://github.com/apache/accumulo/pull/2181#discussion_r665723147



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/logger/LogReader.java
##########
@@ -35,11 +36,13 @@
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.start.spi.KeywordExecutable;
 import org.apache.accumulo.tserver.log.DfsLogger;
 import org.apache.accumulo.tserver.log.DfsLogger.LogHeaderIncompleteException;
-import org.apache.accumulo.tserver.log.RecoveryLogReader;

Review comment:
       The subject line and ultimately the commit message when it is merged will provide the narrative to frame the scope of the work that was done in the commit. Currently, the subject line says "Update LogReader to utilize RecoveryLogsIterator". If there were room for a longer description, it could just as well have also said "... instead of RecoveryLogReader", because that's what the code ended up doing. So, I would consider removing the unused code to be part of this same change set and same scope of work. I don't see any reason to defer to a future PR, because that would be like leaving commented out or unused code in place. But, you gotta use your best judgment. Sometimes it's not clear whether its better to do it as follow on or not. In this case, I think removing the unused code should be included in the change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org