You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2021/08/23 16:42:25 UTC

[accumulo] branch main updated: Rework sorted WAL Upgrade code (#2231)

This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 87d4aa5  Rework sorted WAL Upgrade code (#2231)
87d4aa5 is described below

commit 87d4aa5d51c8dc9ac1bc6d714998228b21270411
Author: Mike Miller <mm...@apache.org>
AuthorDate: Mon Aug 23 12:42:16 2021 -0400

    Rework sorted WAL Upgrade code (#2231)
    
    * Follow on work for #2185
    * Improve upgrade code in Upgrader9to10.dropSortedMapWALFiles()
---
 .../accumulo/manager/upgrade/Upgrader9to10.java    | 55 +++++++++---------
 .../manager/upgrade/Upgrader9to10Test.java         | 66 +++++++++++++++-------
 2 files changed, 74 insertions(+), 47 deletions(-)

diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java
index 597f88c..9334164 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java
@@ -108,7 +108,7 @@ import com.google.common.base.Preconditions;
  *
  * Sorted recovery was updated to use RFiles instead of map files. So to prevent issues during
  * tablet recovery, remove the old temporary map files and resort using RFiles. This is done in
- * {@link #dropSortedMapWALFiles(VolumeManager)}. For more information see the following issues:
+ * {@link #dropSortedMapWALFiles(ServerContext)}. For more information see the following issues:
  * <a href="https://github.com/apache/accumulo/issues/2117">#2117</a> and
  * <a href="https://github.com/apache/accumulo/issues/2179">#2179</a>
  */
@@ -134,6 +134,8 @@ public class Upgrader9to10 implements Upgrader {
     upgradeRootTabletMetadata(ctx);
     renameOldMasterPropsinZK(ctx);
     createExternalCompactionNodes(ctx);
+    // special case where old files need to be deleted
+    dropSortedMapWALFiles(ctx);
   }
 
   @Override
@@ -148,8 +150,6 @@ public class Upgrader9to10 implements Upgrader {
     upgradeRelativePaths(ctx, Ample.DataLevel.USER);
     upgradeDirColumns(ctx, Ample.DataLevel.USER);
     upgradeFileDeletes(ctx, Ample.DataLevel.USER);
-    // special case where old files need to be deleted
-    dropSortedMapWALFiles(ctx.getVolumeManager());
   }
 
   private void setMetaTableProps(ServerContext ctx) {
@@ -738,33 +738,36 @@ public class Upgrader9to10 implements Upgrader {
   /**
    * Remove old temporary map files to prevent problems during recovery.
    */
-  static void dropSortedMapWALFiles(VolumeManager vm) {
-    Path recoveryDir = new Path("/accumulo/recovery");
-    try {
-      if (!vm.exists(recoveryDir)) {
-        log.info("There are no recovery files in /accumulo/recovery");
-        return;
-      }
-      List<Path> directoriesToDrop = new ArrayList<>();
-      for (FileStatus walDir : vm.listStatus(recoveryDir)) {
-        // map files will be in a directory starting with "part"
-        Path walDirPath = walDir.getPath();
-        for (FileStatus dirOrFile : vm.listStatus(walDirPath)) {
-          if (dirOrFile.isDirectory()) {
-            directoriesToDrop.add(walDirPath);
-            break;
+  static void dropSortedMapWALFiles(ServerContext context) {
+    VolumeManager vm = context.getVolumeManager();
+    for (String recoveryDir : context.getRecoveryDirs()) {
+      Path recoveryDirPath = new Path(recoveryDir);
+      try {
+        if (!vm.exists(recoveryDirPath)) {
+          log.info("There are no recovery files in {}", recoveryDir);
+          continue;
+        }
+        List<Path> directoriesToDrop = new ArrayList<>();
+        for (FileStatus walDir : vm.listStatus(recoveryDirPath)) {
+          // map files will be in a directory starting with "part"
+          Path walDirPath = walDir.getPath();
+          for (FileStatus dirOrFile : vm.listStatus(walDirPath)) {
+            if (dirOrFile.isDirectory()) {
+              directoriesToDrop.add(walDirPath);
+              break;
+            }
           }
         }
-      }
-      if (!directoriesToDrop.isEmpty()) {
-        log.info("Found {} old sorted map directories to delete.", directoriesToDrop.size());
-        for (Path dir : directoriesToDrop) {
-          log.info("Deleting everything in old sorted map directory: {}", dir);
-          vm.deleteRecursively(dir);
+        if (!directoriesToDrop.isEmpty()) {
+          log.info("Found {} old sorted map directories to delete.", directoriesToDrop.size());
+          for (Path dir : directoriesToDrop) {
+            log.info("Deleting everything in old sorted map directory: {}", dir);
+            vm.deleteRecursively(dir);
+          }
         }
+      } catch (IOException ioe) {
+        throw new UncheckedIOException(ioe);
       }
-    } catch (IOException ioe) {
-      throw new UncheckedIOException(ioe);
     }
   }
 }
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java
index 25800da..8e5ef28 100644
--- a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java
@@ -24,7 +24,6 @@ import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -32,15 +31,21 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
@@ -49,9 +54,14 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.core.volume.VolumeImpl;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.gc.GcVolumeUtil;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.Test;
@@ -330,40 +340,54 @@ public class Upgrader9to10Test {
 
   @Test
   public void testDropSortedMapWALs() throws IOException {
-    Path recoveryDir = new Path("/accumulo/recovery");
-    VolumeManager fs = createMock(VolumeManager.class);
+    Configuration hadoopConf = new Configuration();
+    ConfigurationCopy conf = new ConfigurationCopy();
+    FileSystem fs = new Path("file:///").getFileSystem(hadoopConf);
+
+    List<String> volumes = Arrays.asList("/vol1/", "/vol2/");
+    Collection<Volume> vols =
+        volumes.stream().map(s -> new VolumeImpl(fs, s)).collect(Collectors.toList());
+    Set<String> fullyQualifiedVols = Set.of("file://vol1/", "file://vol2/");
+    Set<String> recoveryDirs =
+        Set.of("file://vol1/accumulo/recovery", "file://vol2/accumulo/recovery");
+    conf.set(Property.INSTANCE_VOLUMES, String.join(",", fullyQualifiedVols));
+
+    ServerContext context = createMock(ServerContext.class);
+    Path recoveryDir1 = new Path("file://vol1/accumulo/recovery");
+    Path recoveryDir2 = new Path("file://vol2/accumulo/recovery");
+    VolumeManager volumeManager = createMock(VolumeManager.class);
+
     FileStatus[] dirs = new FileStatus[2];
     dirs[0] = createMock(FileStatus.class);
-    Path dir0 = new Path("/accumulo/recovery/A123456789");
+    Path dir0 = new Path("file://vol1/accumulo/recovery/A123456789");
     FileStatus[] dir0Files = new FileStatus[1];
     dir0Files[0] = createMock(FileStatus.class);
     dirs[1] = createMock(FileStatus.class);
-    Path dir1 = new Path("/accumulo/recovery/B123456789");
+    Path dir1 = new Path("file://vol1/accumulo/recovery/B123456789");
     FileStatus[] dir1Files = new FileStatus[1];
     dir1Files[0] = createMock(FileStatus.class);
-    Path part1Dir = new Path("/accumulo/recovery/B123456789/part-r-0000");
+    Path part1Dir = new Path("file://vol1/accumulo/recovery/B123456789/part-r-0000");
+
+    expect(context.getVolumeManager()).andReturn(volumeManager).once();
+    expect(context.getConfiguration()).andReturn(conf).once();
+    expect(context.getHadoopConf()).andReturn(hadoopConf).once();
+    expect(context.getRecoveryDirs()).andReturn(recoveryDirs).once();
+    expect(volumeManager.getVolumes()).andReturn(vols).once();
 
-    expect(fs.exists(recoveryDir)).andReturn(true).once();
-    expect(fs.listStatus(recoveryDir)).andReturn(dirs).once();
+    expect(volumeManager.exists(recoveryDir1)).andReturn(true).once();
+    expect(volumeManager.exists(recoveryDir2)).andReturn(false).once();
+    expect(volumeManager.listStatus(recoveryDir1)).andReturn(dirs).once();
     expect(dirs[0].getPath()).andReturn(dir0).once();
-    expect(fs.listStatus(dir0)).andReturn(dir0Files).once();
+    expect(volumeManager.listStatus(dir0)).andReturn(dir0Files).once();
     expect(dir0Files[0].isDirectory()).andReturn(false).once();
 
     expect(dirs[1].getPath()).andReturn(dir1).once();
-    expect(fs.listStatus(dir1)).andReturn(dir1Files).once();
+    expect(volumeManager.listStatus(dir1)).andReturn(dir1Files).once();
     expect(dir1Files[0].isDirectory()).andReturn(true).once();
     expect(dir1Files[0].getPath()).andReturn(part1Dir).once();
+    expect(volumeManager.deleteRecursively(dir1)).andReturn(true).once();
 
-    expect(fs.deleteRecursively(dir1)).andReturn(true).once();
-
-    replay(fs, dirs[0], dirs[1], dir0Files[0], dir1Files[0]);
-    Upgrader9to10.dropSortedMapWALFiles(fs);
-
-    reset(fs);
-
-    // test case where there is no recovery
-    expect(fs.exists(recoveryDir)).andReturn(false).once();
-    replay(fs);
-    Upgrader9to10.dropSortedMapWALFiles(fs);
+    replay(context, volumeManager, dirs[0], dirs[1], dir0Files[0], dir1Files[0]);
+    Upgrader9to10.dropSortedMapWALFiles(context);
   }
 }