You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/08/06 19:46:24 UTC

[GitHub] keith-turner commented on a change in pull request #574: Add tool to swap out and quarantine corrupt logs

keith-turner commented on a change in pull request #574: Add tool to swap out and quarantine corrupt logs
URL: https://github.com/apache/accumulo/pull/574#discussion_r208007998
 
 

 ##########
 File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/CorruptWalReplacer.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.primitives.Bytes;
+
+public class CorruptWalReplacer {
+
+  private static final Logger log = LoggerFactory.getLogger(CorruptWalReplacer.class);
+  private static final byte[] EMPTY_WAL_CONTENT = Bytes
+      .concat("--- Log File Header (v2) ---".getBytes(UTF_8), new byte[] {0, 0, 0, 0});
+
+  private Connector connector;
+  private String quarantineDir;
+  private String workDir;
+
+  public CorruptWalReplacer(String instanceName, String zooKeepers, String user, String password,
+      String quarantineDir, String workDir) throws AccumuloSecurityException, AccumuloException {
+    ZooKeeperInstance instance = new ZooKeeperInstance(instanceName, zooKeepers);
+    this.connector = instance.getConnector(user, new PasswordToken(password));
+    this.quarantineDir = quarantineDir;
+    this.workDir = workDir;
+  }
+
+  public void run() throws Exception {
+    VolumeManager fs = VolumeManagerImpl.get();
+    Path quarantineDir = new Path(this.quarantineDir);
+    Path workDir = new Path(this.workDir);
+
+    for (KeyExtent extent : getExtents()) {
+      log.info("Checking logs for extent {}", extent);
+      List<Path> logs = getLogs(extent);
+
+      if (logs.isEmpty()) {
+        log.info("No logs found for key {}", extent);
+        continue;
+      }
+
+      // Clean up the working directory, if it exists
+      ArrayList<Path> dirs = new ArrayList<>();
+      if (fs.exists(workDir)) {
+        log.info("Deleting {}", workDir);
+        fs.deleteRecursively(workDir);
+      }
+
+      try {
+        // Map containing the name of the file to the full path
+        Map<String,Path> nameToSource = new HashMap<>();
+        for (Path path : logs) {
+          Path destPath = new Path(workDir, path.getName());
+
+          // Run the log sorter task to prepare for recovery
+          LogSorter.LogSorterTask task = new LogSorter.LogSorterTask(fs,
+              AccumuloConfiguration.getDefaultConfiguration());
+
+          log.info("Invoking sort from path {} to path {}", path, destPath);
+          task.sort(path.getName(), path, destPath.toString());
+
+          log.info("Creating finished marker at {}",
+              SortedLogState.getFinishedMarkerPath(destPath));
+          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+          dirs.add(destPath);
+          nameToSource.put(path.getName(), path);
+          log.info("Mapped {} to {}", path.getName(), path);
+        }
+
+        try {
+          log.info("Starting 'recovery' process");
+          SortedLogRecovery recovery = new SortedLogRecovery(fs);
+          CaptureMutations capture = new CaptureMutations();
+          recovery.recover(extent, dirs, new HashSet<String>(), capture);
+          log.info("Logs are good; key {} has {} mutations", extent, capture.getNumMutations());
+        } catch (LogRecoveryException e) {
+          // An exception occurred; one of the logs was unrecoverable
+          log.error(e.toString());
+
+          // Determine the path to quarantine the file
+          Path quarantinePath = new Path(String.format("%s/%s/%s/%s", quarantineDir,
+              URLEncoder.encode(extent.toString(), "UTF-8"), System.currentTimeMillis(),
+              e.getLog()));
+          Path source = nameToSource.get(e.getLog());
+
+          if (source == null) {
+            throw new RuntimeException("Failed to quarantine; source is null");
+          }
+
+          // Create the quarantine path's parent directory
+          if (!fs.mkdirs(quarantinePath.getParent())) {
+            log.error("Failed to mkdirs {}; not creating empty WAL", quarantinePath.getParent());
+            continue;
+          }
+
+          // Move the unrecoverable WAL to quarantine
+          log.info("Moving {} to {}", source, quarantinePath);
+          if (!fs.rename(source, quarantinePath)) {
+            log.error("Failed to rename file into quarantine directory; not creating empty WAL");
+            continue;
+          }
+
+          // Write out the empty log
+          log.info("Writing empty WAL to {}", source);
+          FSDataOutputStream out = fs.create(source);
+          out.write(EMPTY_WAL_CONTENT);
+          out.close();
+
+          // Delete the old recovery path
+          Path oldRecoveryPath = new Path("/accumulo/recovery", e.getLog());
+          if (fs.exists(oldRecoveryPath)) {
+            log.info("Deleting old recovery path {}", oldRecoveryPath);
+            if (!fs.deleteRecursively(oldRecoveryPath)) {
+              log.error("Failed to delete path {}", oldRecoveryPath);
+            }
+          } else {
+            log.warn("Old recovery path {} does not exist");
+          }
+        }
+      } finally {
+        // Clean up the working directory on exit
+        if (fs.exists(workDir)) {
+          log.info("Deleting {}", workDir);
+          fs.deleteRecursively(workDir);
+        }
+      }
+    }
+    log.info("Done");
+  }
+
+  private Set<KeyExtent> getExtents() throws TableNotFoundException {
+    log.info("Searching for 'log' entries in accumulo.metadata for key extents to check");
+
+    try (Scanner scanner = connector.createScanner("accumulo.metadata", new Authorizations())) {
 
 Review comment:
   This information could be obtained from the WALs by inspecting the define tablet events in the WALs.  However, this has a drawback that some of these tablets may no longer exists.  For example tablets for deleted tables may exists in WALs.  Also, if a tablets splits the parent and children may exists in the WALs.  I don't think these will cause problems, but it may be extra work.
   
   Below is some code I sketched out while thinking through this.
   
   ```java
      // Get live servers from ZK
      Set<TserverId> liveServers = getLiveTservers();
     
      // Get logs for dead servers from ZK
      Map<TserverId, List<Path>>  logsPerDeadTserver = 
          getWALogsForDeadServers(liveServers);
   
      logsPerDeadTserver.forEach((tserverId, walogs) -> {
        // only sort logs once per tserver
        List<Path> sortedWals = sortWALs(walogs);
   
        // Examine tablet definition events in logs and get unique set
        // of extents.  Getting this info from sorted logs is efficient.
        Set<KeyExtent> extents = getExtentsFromWALs(sortedWals);
   
        for(KeyExtent extent : extents) {
           simulateRecovery(extent, sortedWals);
        }
      });
   ```
   
   While thinking about the non-existent tablets, I started thinking about scanning the metadata table again.  Scanning the metadata table would avoid the non-existent tablets I mentioned above.   However to do it properly would require a three step approach that is a hybrid of ZK and metadata table (outlined further down in an example function).  This three step approach has a nice effect : if the root tablet is not recovering, then the tool would focus only on that.  That is a very nice side effect.  Below is some pseudo-code I wrote while thinking through this.
   
   ```java
      // Scan metadata table looking for tablets with walogs.  
     Map<Set<Path>, List<KeyExtent>> tabletsPerWalogSet = scanZKandMetadataForWals();
   
      tabletsPerWalogSet.forEach((walogs, extents) -> {
        // only sort logs once per tserver
        List<Path> sortedWals = sortWALs(walogs);
   
        for(KeyExtent extent : extents) {
           simulateRecovery(extent, sortedWals);
        }
      });
   ```
   Below is the code for scanning ZK and then metadata table.  In the case where the root tablet is not recovering it would only return logs for it.  Also, when a metadata tablet is unhealthy would focus on that.
   
   ```java
   /**
    *  Returns a map where the key is a unique set of logs seen in the metadata
    * table.  The value is the list of extents that had those logs.
    */
   private Map<Set<Path>, List<KeyExtent>> scanZKandMetadataForWals() {
    // TODO look in zookeeper and see if root tablet has walogs.  If so return 
    // map with just info for root tablet.  Can not scan accumuo.root.  Checking
    // the root tablet for walogs assumes the master is healthy and could put 
    // them there.  The master has to notice a tserver is dead and set walogs for
    // the root tablet in ZK.  This function could be made more robust and also 
    // check if the root tablet is assigned to dead tserver and if so, get the 
    // walogs for that tserver (which is what a healthy master should do) from ZK.
   
     // TODO scan accumulo.root.  If any metadata tablets have walogs, then put
     // those in return map.  If found anything, can not continue to scan 
     // accumulo.metadata  must return stuff found.
   
     // TODO scan accumulo.metadata and return map with info about tablets with walogs.
   }
   ```
   
   Using a set for a map key is something I did to make the pseudo-code brief, not sure how well that would work in practice.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services