You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2018/08/06 16:57:23 UTC

[GitHub] adamjshook commented on a change in pull request #574: Add tool to swap out and quarantine corrupt logs

adamjshook commented on a change in pull request #574: Add tool to swap out and quarantine corrupt logs
URL: https://github.com/apache/accumulo/pull/574#discussion_r207961128
 
 

 ##########
 File path: server/tserver/src/main/java/org/apache/accumulo/tserver/log/CorruptWalReplacer.java
 ##########
 @@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.tserver.log;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LogColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.primitives.Bytes;
+
+public class CorruptWalReplacer {
+
+  private static final Logger log = LoggerFactory.getLogger(CorruptWalReplacer.class);
+  private static final byte[] EMPTY_WAL_CONTENT = Bytes
+      .concat("--- Log File Header (v2) ---".getBytes(UTF_8), new byte[] {0, 0, 0, 0});
+
+  private Connector connector;
+  private String quarantineDir;
+  private String workDir;
+
+  public CorruptWalReplacer(String instanceName, String zooKeepers, String user, String password,
+      String quarantineDir, String workDir) throws AccumuloSecurityException, AccumuloException {
+    ZooKeeperInstance instance = new ZooKeeperInstance(instanceName, zooKeepers);
+    this.connector = instance.getConnector(user, new PasswordToken(password));
+    this.quarantineDir = quarantineDir;
+    this.workDir = workDir;
+  }
+
+  public void run() throws Exception {
+    VolumeManager fs = VolumeManagerImpl.get();
+    Path quarantineDir = new Path(this.quarantineDir);
+    Path workDir = new Path(this.workDir);
+
+    for (KeyExtent extent : getExtents()) {
+      log.info("Checking logs for extent {}", extent);
+      List<Path> logs = getLogs(extent);
+
+      if (logs.isEmpty()) {
+        log.info("No logs found for key {}", extent);
+        continue;
+      }
+
+      // Clean up the working directory, if it exists
+      ArrayList<Path> dirs = new ArrayList<>();
+      if (fs.exists(workDir)) {
+        log.info("Deleting {}", workDir);
+        fs.deleteRecursively(workDir);
+      }
+
+      try {
+        // Map containing the name of the file to the full path
+        Map<String,Path> nameToSource = new HashMap<>();
+        for (Path path : logs) {
+          Path destPath = new Path(workDir, path.getName());
+
+          // Run the log sorter task to prepare for recovery
+          LogSorter.LogSorterTask task = new LogSorter.LogSorterTask(fs,
+              AccumuloConfiguration.getDefaultConfiguration());
+
+          log.info("Invoking sort from path {} to path {}", path, destPath);
+          task.sort(path.getName(), path, destPath.toString());
+
+          log.info("Creating finished marker at {}",
+              SortedLogState.getFinishedMarkerPath(destPath));
+          fs.create(SortedLogState.getFinishedMarkerPath(destPath)).close();
+          dirs.add(destPath);
+          nameToSource.put(path.getName(), path);
+          log.info("Mapped {} to {}", path.getName(), path);
+        }
+
+        try {
+          log.info("Starting 'recovery' process");
+          SortedLogRecovery recovery = new SortedLogRecovery(fs);
+          CaptureMutations capture = new CaptureMutations();
+          recovery.recover(extent, dirs, new HashSet<String>(), capture);
+          log.info("Logs are good; key {} has {} mutations", extent, capture.getNumMutations());
+        } catch (LogRecoveryException e) {
+          // An exception occurred; one of the logs was unrecoverable
+          log.error(e.toString());
+
+          // Determine the path to quarantine the file
+          Path quarantinePath = new Path(String.format("%s/%s/%s/%s", quarantineDir,
+              URLEncoder.encode(extent.toString(), "UTF-8"), System.currentTimeMillis(),
+              e.getLog()));
+          Path source = nameToSource.get(e.getLog());
+
+          if (source == null) {
+            throw new RuntimeException("Failed to quarantine; source is null");
+          }
+
+          // Create the quarantine path's parent directory
+          if (!fs.mkdirs(quarantinePath.getParent())) {
+            log.error("Failed to mkdirs {}; not creating empty WAL", quarantinePath.getParent());
+            continue;
+          }
+
+          // Move the unrecoverable WAL to quarantine
+          log.info("Moving {} to {}", source, quarantinePath);
+          if (!fs.rename(source, quarantinePath)) {
+            log.error("Failed to rename file into quarantine directory; not creating empty WAL");
+            continue;
+          }
+
+          // Write out the empty log
+          log.info("Writing empty WAL to {}", source);
+          FSDataOutputStream out = fs.create(source);
+          out.write(EMPTY_WAL_CONTENT);
+          out.close();
+
+          // Delete the old recovery path
+          Path oldRecoveryPath = new Path("/accumulo/recovery", e.getLog());
+          if (fs.exists(oldRecoveryPath)) {
+            log.info("Deleting old recovery path {}", oldRecoveryPath);
+            if (!fs.deleteRecursively(oldRecoveryPath)) {
+              log.error("Failed to delete path {}", oldRecoveryPath);
+            }
+          } else {
+            log.warn("Old recovery path {} does not exist");
+          }
+        }
+      } finally {
+        // Clean up the working directory on exit
+        if (fs.exists(workDir)) {
+          log.info("Deleting {}", workDir);
+          fs.deleteRecursively(workDir);
+        }
+      }
+    }
+    log.info("Done");
+  }
+
+  private Set<KeyExtent> getExtents() throws TableNotFoundException {
+    log.info("Searching for 'log' entries in accumulo.metadata for key extents to check");
+
+    try (Scanner scanner = connector.createScanner("accumulo.metadata", new Authorizations())) {
 
 Review comment:
   @keith-turner I think I have the correct code to collect the open/closed logs that belong to a dead tserver from ZooKeeper.  The recovery code needs a `KeyExtent` though in addition to a collection of logs.  Any recommendations here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services