You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by "dlmarion (via GitHub)" <gi...@apache.org> on 2023/11/16 18:44:43 UTC

[PR] Cleanup failed compaction tmp files [accumulo]

dlmarion opened a new pull request, #3955:
URL: https://github.com/apache/accumulo/pull/3955

   Modified TabletNameGenerator.getNextDataFilenameForMajc to add the external compaction id to the filename. When a compaction fails, or the DeadCompactionDetector notices a Compactor is down, CompactionCoordinator.compactionFailed is called to remove the external compaction id from the tablet metadata. Modified this method to also remove corresponding tmp files from the tablet directory when the external compaction id is successfully removed from the tablet metadata. Created the FindCompactionTmpFiles utility to find, and optionally delete, compaction tmp files at a system level. This utility requires that no Compactors are running to avoid any collisions.
   
   Fixes #3577


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1406374367


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.trace("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> {
+        if (context.getVolumeManager().exists(p)) {
+          return context.getVolumeManager().delete(p);
+        }
+        return true;
+      }));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }

Review Comment:
   Could adjust the log stmt to count all that are not done and drop the expectedResponses variable.
   
   ```java
   LOG.debug("Waiting on {} background delete operations", futures.stream().filter(f->!f.isDone()).count());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396406969


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1068,6 +1069,38 @@ void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
           }
+        } else {
+
+          // compactionFailed is called from the Compactor when either a compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. This block is
+          // entered when the conditional mutator above successfully deletes an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet directory
+          // that have a corresponding ecid in the name.
+          String dirName = ctx.getAmple().readTablet(extent, ColumnType.DIR).getDirName();

Review Comment:
   ExternalCompaction_3_IT.testMergeCancelsExternalCompaction() was disabled.  I tried running it and it passed so I pushed a commit to enable it.  Did not bother with a PR since it was a single line change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1397639755


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1068,6 +1069,38 @@ void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
           }
+        } else {
+
+          // compactionFailed is called from the Compactor when either a compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. This block is
+          // entered when the conditional mutator above successfully deletes an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet directory
+          // that have a corresponding ecid in the name.
+          String dirName = ctx.getAmple().readTablet(extent, ColumnType.DIR).getDirName();

Review Comment:
   Looked in tablet dir in each volume in 46e8c63



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1397876093


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.net.HostAndPort;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+    @Parameter(names = "--delete")
+    boolean delete = false;
+  }
+
+  private static boolean allCompactorsDown(ClientContext context) {
+    // This is a copy of ExternalCompactionUtil.getCompactorAddrs that returns
+    // false if any compactor address is found. If there are no compactor addresses
+    // in any of the groups, then it returns true.
+    try {
+      final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
+      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
+      ZooReader zooReader = context.getZooReader();
+      List<String> groups = zooReader.getChildren(compactorGroupsPath);
+      for (String group : groups) {
+        groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
+        try {
+          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
+          for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
+            if (!children.isEmpty()) {
+              LOG.trace("Found live compactor {} ", compactor);
+              return false;
+            }
+          }
+        } catch (NoNodeException e) {
+          LOG.trace("Ignoring node that went missing", e);
+        }
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static List<Path> findTempFiles(ServerContext context) throws InterruptedException {
+    final String pattern = "/tables/*/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files in volume: {} that match pattern: {}", vol, volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files =
+              vol.getFileSystem().globStatus(volPattern, (p) -> p.getName().contains("_tmp_ECID-"));
+          System.out.println(Arrays.toString(files));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Deleting compaction tmp files: {}", opts.delete);
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+      ServerContext context = opts.getServerContext();
+      if (!allCompactorsDown(context)) {
+        LOG.warn("Compactor addresses found in ZooKeeper. Unable to run this utility.");
+      }
+
+      final List<Path> matches = findTempFiles(context);
+      LOG.info("Found the following compaction tmp files:");
+      matches.forEach(p -> LOG.info("{}", p));
+

Review Comment:
   Modified utility in 6e75f70 to remove the constraint that Compactors are down, and removed set of current active external compactions from the compaction tmp files that exist on the filesystem, in 6e75f70.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1397881030


##########
core/src/main/java/org/apache/accumulo/core/metadata/ReferencedTabletFile.java:
##########
@@ -97,6 +97,8 @@ private static String constructErrorMsg(Path filePath) {
   public static FileParts parsePath(Path filePath) {
     // File name construct: <volume>/<tablePath>/<tableId>/<tablet>/<file>
     // Example: hdfs://namenode:9020/accumulo/tables/1/default_tablet/F00001.rf
+    // Example compaction tmp file:
+    // hdfs://namenode:9020/accumulo/tables/1/default_tablet/F00001.rf_tmp_ECID%3A<uuid>

Review Comment:
   ```suggestion
       // hdfs://namenode:9020/accumulo/tables/1/default_tablet/F00001.rf_tmp_ECID-<uuid>
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/tablets/TabletNameGenerator.java:
##########
@@ -67,11 +68,12 @@ public static ReferencedTabletFile getNextDataFilename(FilePrefix prefix, Server
   }
 
   public static ReferencedTabletFile getNextDataFilenameForMajc(boolean propagateDeletes,
-      ServerContext context, TabletMetadata tabletMetadata, Consumer<String> dirCreator) {
+      ServerContext context, TabletMetadata tabletMetadata, Consumer<String> dirCreator,
+      ExternalCompactionId ecid) {
     String tmpFileName = getNextDataFilename(
         !propagateDeletes ? FilePrefix.MAJOR_COMPACTION_ALL_FILES : FilePrefix.MAJOR_COMPACTION,
-        context, tabletMetadata.getExtent(), tabletMetadata.getDirName(), dirName -> {}).insert()
-        .getMetadataPath() + "_tmp";
+        context, tabletMetadata.getExtent(), tabletMetadata.getDirName(), dirCreator).insert()

Review Comment:
   Note that `dirCreator` was unused prior to this commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1406476982


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.trace("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> {
+        if (context.getVolumeManager().exists(p)) {
+          return context.getVolumeManager().delete(p);
+        }
+        return true;
+      }));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }

Review Comment:
   The rationale behind my implementation is to reduce memory usage in the Manager as fast as possible. If you think that's not really a concern here, then I'm not opposed to implementing your suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1397679323


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.net.HostAndPort;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+    @Parameter(names = "--delete")
+    boolean delete = false;
+  }
+
+  private static boolean allCompactorsDown(ClientContext context) {
+    // This is a copy of ExternalCompactionUtil.getCompactorAddrs that returns
+    // false if any compactor address is found. If there are no compactor addresses
+    // in any of the groups, then it returns true.
+    try {
+      final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
+      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
+      ZooReader zooReader = context.getZooReader();
+      List<String> groups = zooReader.getChildren(compactorGroupsPath);
+      for (String group : groups) {
+        groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
+        try {
+          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
+          for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
+            if (!children.isEmpty()) {
+              LOG.trace("Found live compactor {} ", compactor);
+              return false;
+            }
+          }
+        } catch (NoNodeException e) {
+          LOG.trace("Ignoring node that went missing", e);
+        }
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static List<Path> findTempFiles(ServerContext context) throws InterruptedException {
+    final String pattern = "/tables/*/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files in volume: {} that match pattern: {}", vol, volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files =
+              vol.getFileSystem().globStatus(volPattern, (p) -> p.getName().contains("_tmp_ECID-"));
+          System.out.println(Arrays.toString(files));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Deleting compaction tmp files: {}", opts.delete);
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+      ServerContext context = opts.getServerContext();
+      if (!allCompactorsDown(context)) {
+        LOG.warn("Compactor addresses found in ZooKeeper. Unable to run this utility.");
+      }
+
+      final List<Path> matches = findTempFiles(context);
+      LOG.info("Found the following compaction tmp files:");
+      matches.forEach(p -> LOG.info("{}", p));
+

Review Comment:
   I talked with @keith-turner and we agreed to not put this in the Manager for now, due to the memory issues for the globStatus results. I am going to change the utility though so that it does not expect all of the Compactors to be down, so that it can be run at any time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396325133


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1068,6 +1069,38 @@ void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
           }
+        } else {
+
+          // compactionFailed is called from the Compactor when either a compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. This block is
+          // entered when the conditional mutator above successfully deletes an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet directory
+          // that have a corresponding ecid in the name.
+          String dirName = ctx.getAmple().readTablet(extent, ColumnType.DIR).getDirName();

Review Comment:
   The tablets dir is not a fully qualified path, its just a name.  Need to look for the dir in each volume.
   
   Its possible the tablet no longer exists at this point and readTablets returns null.
   
   I was looking into testing this code.  If we modify the `ExternalDoNothingCompactor` to actually create a file, then maybe we could modify the following test to wait for the file to exist before killing the compactor process (or starting a merge/split) and then wait for the file to be deleted by the coordinator.
   
    * ExternalCompaction_1_IT.testCompactionAndCompactorDies()  
    * ExternalCompaction_2_IT.testSplitCancelsExternalCompaction()
    * ExternalCompaction_2_IT.testUserCompactionCancellation()
    * ExternalCompaction_3_IT.testMergeCancelsExternalCompaction()
   
   
   
   



##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.net.HostAndPort;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+    @Parameter(names = "--delete")
+    boolean delete = false;
+  }
+
+  private static boolean allCompactorsDown(ClientContext context) {
+    // This is a copy of ExternalCompactionUtil.getCompactorAddrs that returns
+    // false if any compactor address is found. If there are no compactor addresses
+    // in any of the groups, then it returns true.
+    try {
+      final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
+      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
+      ZooReader zooReader = context.getZooReader();
+      List<String> groups = zooReader.getChildren(compactorGroupsPath);
+      for (String group : groups) {
+        groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
+        try {
+          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
+          for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
+            if (!children.isEmpty()) {
+              LOG.trace("Found live compactor {} ", compactor);
+              return false;
+            }
+          }
+        } catch (NoNodeException e) {
+          LOG.trace("Ignoring node that went missing", e);
+        }
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static List<Path> findTempFiles(ServerContext context) throws InterruptedException {
+    final String pattern = "/tables/*/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files in volume: {} that match pattern: {}", vol, volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files =
+              vol.getFileSystem().globStatus(volPattern, (p) -> p.getName().contains("_tmp_ECID-"));
+          System.out.println(Arrays.toString(files));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Deleting compaction tmp files: {}", opts.delete);
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+      ServerContext context = opts.getServerContext();
+      if (!allCompactorsDown(context)) {
+        LOG.warn("Compactor addresses found in ZooKeeper. Unable to run this utility.");
+      }
+
+      final List<Path> matches = findTempFiles(context);
+      LOG.info("Found the following compaction tmp files:");
+      matches.forEach(p -> LOG.info("{}", p));
+

Review Comment:
   At this point we could scan the metadata table and get active ECIDs and remove paths from matches that have and active ECID.  Scanning for only tablets that have an ECID would be another good use case for #3933.  Would need to scan all three levels of metadata.
   
   That would make this utility able to be run while compactors are running.  If that is the case then the compaction coordinator could possibly call this code periodically (like once a day by default).   The other code for deleting temp files could miss files if the coordinator dies a certain time or because of race conditions with split and merge that delete the tablet and create new tablet.  Periodically running this would still clean up things missed by the other code.



##########
core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java:
##########
@@ -67,4 +67,10 @@ public static ExternalCompactionId from(String ecid) {
     return of(ecid);
   }
 
+  public String encodeForFileName() {
+    // A colon in the file name causes issues in HDFS. Use a different character
+    // that will not be URLEncoded
+    return canonical().replace(':', '-');

Review Comment:
   Can we change the PREFIX constant and remove this method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1402466931


##########
test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.tablets.TabletNameGenerator;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class FindCompactionTmpFilesIT extends SharedMiniClusterBase {

Review Comment:
   If possible it would be nice to verify the default behavior of calling main is not to delete.  I saw the util has `--delete` arg that defaults to false.



##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -1097,6 +1097,11 @@ public enum Property {
   @Experimental
   COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
       "Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"),
+  COMPACTOR_CANCEL_CHECK_INTERVAL("compactor.cancel.check.interval", "5m",
+      PropertyType.TIMEDURATION,
+      "Interval at which Compactors will check to see if the currently executing compaction"
+          + " should be cancelled.",

Review Comment:
   ```suggestion
             + " should be cancelled.  This checks for situations like was the tablet deleted (split and merge do this), was the table deleted, was a user compaction canceled, etc",
   ```



##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1068,8 +1088,73 @@ void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
           }
+        } else {
+          // compactionFailed is called from the Compactor when either a compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. This block is
+          // entered when the conditional mutator above successfully deletes an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet directory
+          // that have a corresponding ecid in the name.
+
+          ecidsForTablet.clear();
+          compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0)
+              .map(Entry::getKey).forEach(ecidsForTablet::add);
+
+          if (!ecidsForTablet.isEmpty()) {
+            final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR);
+            if (tm != null) {
+              final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
+              for (Volume vol : vols) {
+                try {
+                  final String volPath =
+                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+                          + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName();
+                  final FileSystem fs = vol.getFileSystem();
+                  for (ExternalCompactionId ecid : ecidsForTablet) {
+                    final String fileSuffix = "_tmp_" + ecid.canonical();
+                    FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> {
+                      return path.getName().endsWith(fileSuffix);
+                    });
+                    if (files.length > 0) {
+                      for (FileStatus file : files) {
+                        if (!fs.delete(file.getPath(), false)) {
+                          LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath());
+                        } else {
+                          LOG.debug("Deleted ecid tmp file: {}", file.getPath());
+                        }
+                      }
+                    }
+                  }
+                } catch (IOException e) {
+                  LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e);
+                }
+              }
+            } else {
+              // TabletMetadata does not exist for the extent. This could be due to a merge or
+              // split operation. Use the utility to find tmp files at the table level
+              missingExtentTables.add(extent.tableId());
+            }
+          }
         }
       });
+
+      if (!missingExtentTables.isEmpty()) {
+        for (TableId tid : missingExtentTables) {
+          try {
+            final Set<Path> matches = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());

Review Comment:
   This could be expensive in the case where lots of compactions failed and end up here, but in separate function calls.  Could make missingExtentTables a set at the compaction coordinator level and just add to the set in this function.  Then have a new thread that periodically processes the set.  This could be a follow on issue. 



##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {

Review Comment:
   This could be a follow on issue, may be nice to add this where it can be run from the accumulo command.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1397639203


##########
core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java:
##########
@@ -67,4 +67,10 @@ public static ExternalCompactionId from(String ecid) {
     return of(ecid);
   }
 
+  public String encodeForFileName() {
+    // A colon in the file name causes issues in HDFS. Use a different character
+    // that will not be URLEncoded
+    return canonical().replace(':', '-');

Review Comment:
   Done in 46e8c63



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396342341


##########
core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java:
##########
@@ -67,4 +67,10 @@ public static ExternalCompactionId from(String ecid) {
     return of(ecid);
   }
 
+  public String encodeForFileName() {
+    // A colon in the file name causes issues in HDFS. Use a different character
+    // that will not be URLEncoded
+    return canonical().replace(':', '-');

Review Comment:
   We could that. We just need to use something that will work in Hadoop filenames *and* won't be changed during URLEncoding. For some reason URL encoding was happening twice, which led to issues. Changing the `:` to a `-` in the ECID Prefix should work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1399878926


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.debug("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));

Review Comment:
   Matches is a list, could make it a set since remove is called on it here.



##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);

Review Comment:
   Based on other comment about this variable, could change it to the following.
   
   ```suggestion
       final Set<Path> matches = ConcurrentHashMap.newKeySet()
   ```



##########
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java:
##########
@@ -130,6 +136,10 @@ public void testSplitCancelsExternalCompaction() throws Exception {
       // compaction above in the test. Even though the external compaction was cancelled
       // because we split the table, FaTE will continue to queue up a compaction
       client.tableOperations().cancelCompaction(table1);
+
+      // Verify that the tmp file are cleaned up
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 1);

Review Comment:
   Should this wait for zero?
   
   ```suggestion
             .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 0);
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.debug("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.debug("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));

Review Comment:
   Will this throw an exception when the files does not exists?  This util may find files that the system was in the process of renaming or deleting anyway, would be nice to only show errors when the file does not exists.



##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));

Review Comment:
   Multiple threads are adding to matches which is not a concurrent or synchronized collection.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396418908


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.net.HostAndPort;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+    @Parameter(names = "--delete")
+    boolean delete = false;
+  }
+
+  private static boolean allCompactorsDown(ClientContext context) {
+    // This is a copy of ExternalCompactionUtil.getCompactorAddrs that returns
+    // false if any compactor address is found. If there are no compactor addresses
+    // in any of the groups, then it returns true.
+    try {
+      final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
+      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
+      ZooReader zooReader = context.getZooReader();
+      List<String> groups = zooReader.getChildren(compactorGroupsPath);
+      for (String group : groups) {
+        groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
+        try {
+          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
+          for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
+            if (!children.isEmpty()) {
+              LOG.trace("Found live compactor {} ", compactor);
+              return false;
+            }
+          }
+        } catch (NoNodeException e) {
+          LOG.trace("Ignoring node that went missing", e);
+        }
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static List<Path> findTempFiles(ServerContext context) throws InterruptedException {
+    final String pattern = "/tables/*/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files in volume: {} that match pattern: {}", vol, volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files =
+              vol.getFileSystem().globStatus(volPattern, (p) -> p.getName().contains("_tmp_ECID-"));
+          System.out.println(Arrays.toString(files));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Deleting compaction tmp files: {}", opts.delete);
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+      ServerContext context = opts.getServerContext();
+      if (!allCompactorsDown(context)) {
+        LOG.warn("Compactor addresses found in ZooKeeper. Unable to run this utility.");
+      }
+
+      final List<Path> matches = findTempFiles(context);
+      LOG.info("Found the following compaction tmp files:");
+      matches.forEach(p -> LOG.info("{}", p));
+

Review Comment:
   One concern I have for running this in the CompactionCoordinator, which is running in the Manager, is memory usage in the case where there are a *lot* of tmp files that need to be cleaned up. The call to `FileSystem.globStatus` is going to return an unknown amount of `FileStatus` objects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1400854636


##########
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java:
##########
@@ -130,6 +136,10 @@ public void testSplitCancelsExternalCompaction() throws Exception {
       // compaction above in the test. Even though the external compaction was cancelled
       // because we split the table, FaTE will continue to queue up a compaction
       client.tableOperations().cancelCompaction(table1);
+
+      // Verify that the tmp file are cleaned up
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 1);

Review Comment:
   Fixed in 9f4315a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1406323165


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.trace("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> {
+        if (context.getVolumeManager().exists(p)) {
+          return context.getVolumeManager().delete(p);
+        }
+        return true;
+      }));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }

Review Comment:
   Could remove this sleep and instead always call get.  This avoids uneeded sleeping when things are done.  Also adjusted the log message to be a bit more specific about what is being waited on.
   
   
   ```suggestion
         int expectedResponses = filesToDelete.size();
         Iterator<Future<Boolean>> iter = futures.iterator();
         while (iter.hasNext()) {
           Future<Boolean> future = iter.next();
             try {
               while(true){
                 try{
                   if (future.get(10, TimeUnit.SECONDS)) {
                     stats.success++;
                   } else {
                     stats.failure++;
                   }
                   break;
                 }catch(TimeoutException e) {
                    LOG.debug("Waiting on {} background delete operations", expectedResponses);
                 }
               }
             } catch (ExecutionException e) {
               stats.error++;
               LOG.error("Error deleting a compaction tmp file", e);
             }
             expectedResponses--;
         }
   ```



##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);

Review Comment:
   Could also remove this sleep and always call get() on future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1406471993


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.trace("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> {
+        if (context.getVolumeManager().exists(p)) {
+          return context.getVolumeManager().delete(p);
+        }
+        return true;
+      }));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }

Review Comment:
   If we don't care about the intermediate logging while waiting, then only calling get() on the futures w/o the timeout would make the code much simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1406461255


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.trace("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> {
+        if (context.getVolumeManager().exists(p)) {
+          return context.getVolumeManager().delete(p);
+        }
+        return true;
+      }));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }

Review Comment:
   @keith-turner - I didn't implement your suggestion, but I believe that I addressed your concern of the unnecessary sleep, in 602b6f2. I didn't implement your suggestion, because my implementation checked each future without waiting, whereas yours would wait for the next one to complete before making progress.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1397875226


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1068,6 +1069,38 @@ void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
           }
+        } else {
+
+          // compactionFailed is called from the Compactor when either a compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. This block is
+          // entered when the conditional mutator above successfully deletes an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet directory
+          // that have a corresponding ecid in the name.
+          String dirName = ctx.getAmple().readTablet(extent, ColumnType.DIR).getDirName();

Review Comment:
   Modified DoNothingExternalCompactor to create tmp file, added checks in the tests listed above, in 6e75f70



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#issuecomment-1815354158

   This would not be a change for this PR and not sure about the concept, but looking at this PR made me think of putting compaction tmp files in a different place instead of the tablet dir.
   
   Like maybe for each volume and each table we have a compacting dir where active compactions store data.  For example each volume could have a dir like  `compactions/<table id>` that contains files for running compactions.  The benefit of this is that it makes it quick to find all files for running compactions.  One downside is that it adds code complexity (like this dir would need to be cleaned up on table deletion).  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#issuecomment-1815365636

   A similar thought crossed my mind, but I was just thinking of one directory, not per-table. A couple of nice things about this (with or without the per-table subdirectory), is that you could lower the replication on the tmp files or change the erasure-coding configuration on the tmp directory. The downside with that though is I think you would have to copy the files to their destination instead of moving them. If the directory was configurable, you could store the temp files on a totally different volume, which would also require a copy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396337296


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.net.HostAndPort;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+    @Parameter(names = "--delete")
+    boolean delete = false;
+  }
+
+  private static boolean allCompactorsDown(ClientContext context) {
+    // This is a copy of ExternalCompactionUtil.getCompactorAddrs that returns
+    // false if any compactor address is found. If there are no compactor addresses
+    // in any of the groups, then it returns true.
+    try {
+      final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
+      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
+      ZooReader zooReader = context.getZooReader();
+      List<String> groups = zooReader.getChildren(compactorGroupsPath);
+      for (String group : groups) {
+        groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
+        try {
+          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
+          for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
+            if (!children.isEmpty()) {
+              LOG.trace("Found live compactor {} ", compactor);
+              return false;
+            }
+          }
+        } catch (NoNodeException e) {
+          LOG.trace("Ignoring node that went missing", e);
+        }
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static List<Path> findTempFiles(ServerContext context) throws InterruptedException {
+    final String pattern = "/tables/*/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files in volume: {} that match pattern: {}", vol, volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files =
+              vol.getFileSystem().globStatus(volPattern, (p) -> p.getName().contains("_tmp_ECID-"));
+          System.out.println(Arrays.toString(files));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Deleting compaction tmp files: {}", opts.delete);
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+      ServerContext context = opts.getServerContext();
+      if (!allCompactorsDown(context)) {
+        LOG.warn("Compactor addresses found in ZooKeeper. Unable to run this utility.");
+      }
+
+      final List<Path> matches = findTempFiles(context);
+      LOG.info("Found the following compaction tmp files:");
+      matches.forEach(p -> LOG.info("{}", p));
+

Review Comment:
   At this point we could scan the metadata table and get active ECIDs and remove paths from `matches` that have an active ECID.  Scanning for only tablets that have an ECID would be another good use case for #3933.  Would need to scan all three levels of metadata.
   
   That would make this utility able to be run while compactors are running.  If that is the case then the compaction coordinator could possibly call this code periodically (like once a day by default).   The other code for deleting temp files could miss files if the coordinator dies a certain time or because of race conditions with split and merge that delete the tablet and create new tablet.  Periodically running this would still clean up things missed by the other code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#issuecomment-1815338469

   > Pretty sure the idea was yours in the original ticket :-)
   
   I just looked that the issue. I did not remember that.  Good thing we are opening so many issues as we go, so things are not forgotten.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1406494009


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.trace("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> {
+        if (context.getVolumeManager().exists(p)) {
+          return context.getVolumeManager().delete(p);
+        }
+        return true;
+      }));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }

Review Comment:
   > The rationale behind my implementation is to reduce memory usage in the Manager as fast as possible. If you think that's not really a concern here, then I'm not opposed to implementing your suggestion.
   
   I like that goal, its more important that minimizing the wait time.  Minimizing the wait time probably only benefits ITs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1400855603


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));

Review Comment:
   Made a ConcurrentSkipListSet in 9f4315a



##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.debug("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));

Review Comment:
   Made a ConcurrentSkipListSet in 9f4315a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1402683416


##########
core/src/main/java/org/apache/accumulo/core/conf/Property.java:
##########
@@ -1097,6 +1097,11 @@ public enum Property {
   @Experimental
   COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
       "Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"),
+  COMPACTOR_CANCEL_CHECK_INTERVAL("compactor.cancel.check.interval", "5m",
+      PropertyType.TIMEDURATION,
+      "Interval at which Compactors will check to see if the currently executing compaction"
+          + " should be cancelled.",

Review Comment:
   Property description updated in a6eac57



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1402744220


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1068,8 +1088,73 @@ void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
           }
+        } else {
+          // compactionFailed is called from the Compactor when either a compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. This block is
+          // entered when the conditional mutator above successfully deletes an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet directory
+          // that have a corresponding ecid in the name.
+
+          ecidsForTablet.clear();
+          compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0)
+              .map(Entry::getKey).forEach(ecidsForTablet::add);
+
+          if (!ecidsForTablet.isEmpty()) {
+            final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR);
+            if (tm != null) {
+              final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
+              for (Volume vol : vols) {
+                try {
+                  final String volPath =
+                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+                          + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName();
+                  final FileSystem fs = vol.getFileSystem();
+                  for (ExternalCompactionId ecid : ecidsForTablet) {
+                    final String fileSuffix = "_tmp_" + ecid.canonical();
+                    FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> {
+                      return path.getName().endsWith(fileSuffix);
+                    });
+                    if (files.length > 0) {
+                      for (FileStatus file : files) {
+                        if (!fs.delete(file.getPath(), false)) {
+                          LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath());
+                        } else {
+                          LOG.debug("Deleted ecid tmp file: {}", file.getPath());
+                        }
+                      }
+                    }
+                  }
+                } catch (IOException e) {
+                  LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e);
+                }
+              }
+            } else {
+              // TabletMetadata does not exist for the extent. This could be due to a merge or
+              // split operation. Use the utility to find tmp files at the table level
+              missingExtentTables.add(extent.tableId());
+            }
+          }
         }
       });
+
+      if (!missingExtentTables.isEmpty()) {
+        for (TableId tid : missingExtentTables) {
+          try {
+            final Set<Path> matches = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());

Review Comment:
   I implemented my idea above in c06ce0c. The ExternalCompaction ITs are still passing with this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396337296


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.net.HostAndPort;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+    @Parameter(names = "--delete")
+    boolean delete = false;
+  }
+
+  private static boolean allCompactorsDown(ClientContext context) {
+    // This is a copy of ExternalCompactionUtil.getCompactorAddrs that returns
+    // false if any compactor address is found. If there are no compactor addresses
+    // in any of the groups, then it returns true.
+    try {
+      final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
+      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
+      ZooReader zooReader = context.getZooReader();
+      List<String> groups = zooReader.getChildren(compactorGroupsPath);
+      for (String group : groups) {
+        groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
+        try {
+          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
+          for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
+            if (!children.isEmpty()) {
+              LOG.trace("Found live compactor {} ", compactor);
+              return false;
+            }
+          }
+        } catch (NoNodeException e) {
+          LOG.trace("Ignoring node that went missing", e);
+        }
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static List<Path> findTempFiles(ServerContext context) throws InterruptedException {
+    final String pattern = "/tables/*/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files in volume: {} that match pattern: {}", vol, volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files =
+              vol.getFileSystem().globStatus(volPattern, (p) -> p.getName().contains("_tmp_ECID-"));
+          System.out.println(Arrays.toString(files));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Deleting compaction tmp files: {}", opts.delete);
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+      ServerContext context = opts.getServerContext();
+      if (!allCompactorsDown(context)) {
+        LOG.warn("Compactor addresses found in ZooKeeper. Unable to run this utility.");
+      }
+
+      final List<Path> matches = findTempFiles(context);
+      LOG.info("Found the following compaction tmp files:");
+      matches.forEach(p -> LOG.info("{}", p));
+

Review Comment:
   At this point we could scan the metadata table and get active ECIDs and remove paths from matches that have an active ECID.  Scanning for only tablets that have an ECID would be another good use case for #3933.  Would need to scan all three levels of metadata.
   
   That would make this utility able to be run while compactors are running.  If that is the case then the compaction coordinator could possibly call this code periodically (like once a day by default).   The other code for deleting temp files could miss files if the coordinator dies a certain time or because of race conditions with split and merge that delete the tablet and create new tablet.  Periodically running this would still clean up things missed by the other code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1406536054


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {

Review Comment:
   Created #3986 for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion merged PR #3955:
URL: https://github.com/apache/accumulo/pull/3955


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#issuecomment-1815330037

   > > Modified TabletNameGenerator.getNextDataFilenameForMajc to add the external compaction id to the filename.
   > 
   > Putting the ECID at the end of the tmp file name is a really neat solution to this problem.
   
   Pretty sure the idea was yours in the original ticket :-)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1398040431


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.debug("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    context.getAmple().readTablets().forLevel(DataLevel.ROOT).fetch(ColumnType.ECOMP).build()
+        .forEach(tm -> {
+          tm.getExternalCompactions().values()
+              .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+        });
+    context.getAmple().readTablets().forLevel(DataLevel.METADATA).fetch(ColumnType.ECOMP).build()
+        .forEach(tm -> {
+          tm.getExternalCompactions().values()
+              .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+        });
+    context.getAmple().readTablets().forLevel(DataLevel.USER).fetch(ColumnType.ECOMP).build()
+        .forEach(tm -> {
+          tm.getExternalCompactions().values()
+              .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+        });

Review Comment:
   TODO: Turn this into one loop using the DataLevel as the loop variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1400577538


##########
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_2_IT.java:
##########
@@ -130,6 +136,10 @@ public void testSplitCancelsExternalCompaction() throws Exception {
       // compaction above in the test. Even though the external compaction was cancelled
       // because we split the table, FaTE will continue to queue up a compaction
       client.tableOperations().cancelCompaction(table1);
+
+      // Verify that the tmp file are cleaned up
+      Wait.waitFor(() -> FindCompactionTmpFiles
+          .findTempFiles(getCluster().getServerContext(), tid.canonical()).size() == 1);

Review Comment:
   Good catch - copy/paste error. I found another instance of it too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1400854193


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.debug("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.debug("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));

Review Comment:
   I'm not sure if it would throw an exception, or just return false. The javadoc doesn't say. I guarded it with an exists(Path) in 9f4315a call just in case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1406470367


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static Set<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final Set<Path> matches = new ConcurrentSkipListSet<>();
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.trace("Looking for tmp files that match pattern: {}", volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files = vol.getFileSystem().globStatus(volPattern,
+              (p) -> p.getName().contains("_tmp_" + ExternalCompactionId.PREFIX));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    LOG.trace("Found compaction tmp files: {}", matches);
+
+    // Remove paths of all active external compaction output files from the set of
+    // tmp files found on the filesystem. This must be done *after* gathering the
+    // matches on the filesystem.
+    for (DataLevel level : DataLevel.values()) {
+      context.getAmple().readTablets().forLevel(level).fetch(ColumnType.ECOMP).build()
+          .forEach(tm -> {
+            tm.getExternalCompactions().values()
+                .forEach(ecm -> matches.remove(ecm.getCompactTmpName().getPath()));
+          });
+    }
+    LOG.trace("Final set of compaction tmp files after removing active compactions: {}", matches);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, Set<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> {
+        if (context.getVolumeManager().exists(p)) {
+          return context.getVolumeManager().delete(p);
+        }
+        return true;
+      }));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }

Review Comment:
   > I didn't implement your suggestion, because my implementation checked each future without waiting, whereas yours would wait for the next one to complete before making progress.
   
   The code will not progress until all futures are done. Was thinking calling get() is the most time efficient way to do this because it will wait the minimal time for any single future to be done.  If we wait on a future and other after it are done that is ok, because once get is called on those it will return immediately.  So calling only get should limit the wait time to the longest future and no more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1402690283


##########
server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java:
##########
@@ -1068,8 +1088,73 @@ void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
                     .findFirst().map(Map.Entry::getKey).orElse(null);
             LOG.debug("Unable to remove failed compaction {} {}", extent, ecid);
           }
+        } else {
+          // compactionFailed is called from the Compactor when either a compaction fails or
+          // is cancelled and it's called from the DeadCompactionDetector. This block is
+          // entered when the conditional mutator above successfully deletes an ecid from
+          // the tablet metadata. Remove compaction tmp files from the tablet directory
+          // that have a corresponding ecid in the name.
+
+          ecidsForTablet.clear();
+          compactions.entrySet().stream().filter(e -> e.getValue().compareTo(extent) == 0)
+              .map(Entry::getKey).forEach(ecidsForTablet::add);
+
+          if (!ecidsForTablet.isEmpty()) {
+            final TabletMetadata tm = ctx.getAmple().readTablet(extent, ColumnType.DIR);
+            if (tm != null) {
+              final Collection<Volume> vols = ctx.getVolumeManager().getVolumes();
+              for (Volume vol : vols) {
+                try {
+                  final String volPath =
+                      vol.getBasePath() + Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+                          + extent.tableId().canonical() + Path.SEPARATOR + tm.getDirName();
+                  final FileSystem fs = vol.getFileSystem();
+                  for (ExternalCompactionId ecid : ecidsForTablet) {
+                    final String fileSuffix = "_tmp_" + ecid.canonical();
+                    FileStatus[] files = fs.listStatus(new Path(volPath), (path) -> {
+                      return path.getName().endsWith(fileSuffix);
+                    });
+                    if (files.length > 0) {
+                      for (FileStatus file : files) {
+                        if (!fs.delete(file.getPath(), false)) {
+                          LOG.warn("Unable to delete ecid tmp file: {}: ", file.getPath());
+                        } else {
+                          LOG.debug("Deleted ecid tmp file: {}", file.getPath());
+                        }
+                      }
+                    }
+                  }
+                } catch (IOException e) {
+                  LOG.error("Exception deleting compaction tmp files for tablet: {}", extent, e);
+                }
+              }
+            } else {
+              // TabletMetadata does not exist for the extent. This could be due to a merge or
+              // split operation. Use the utility to find tmp files at the table level
+              missingExtentTables.add(extent.tableId());
+            }
+          }
         }
       });
+
+      if (!missingExtentTables.isEmpty()) {
+        for (TableId tid : missingExtentTables) {
+          try {
+            final Set<Path> matches = FindCompactionTmpFiles.findTempFiles(ctx, tid.canonical());

Review Comment:
   > Could make missingExtentTables a set at the compaction coordinator level
   
   Could make this Set a class variable in the DeadCompactionDetector, then call FindCompactionTmpFiles from `detectDeadCompactions`. This is already being done in another thread on a configurable time interval.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1402683080


##########
test/src/main/java/org/apache/accumulo/test/functional/FindCompactionTmpFilesIT.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.tablets.TabletNameGenerator;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles;
+import org.apache.accumulo.server.util.FindCompactionTmpFiles.DeleteStats;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class FindCompactionTmpFilesIT extends SharedMiniClusterBase {

Review Comment:
   Tests added in a6eac57



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396354344


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.net.HostAndPort;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+    @Parameter(names = "--delete")
+    boolean delete = false;
+  }
+
+  private static boolean allCompactorsDown(ClientContext context) {
+    // This is a copy of ExternalCompactionUtil.getCompactorAddrs that returns
+    // false if any compactor address is found. If there are no compactor addresses
+    // in any of the groups, then it returns true.
+    try {
+      final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
+      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
+      ZooReader zooReader = context.getZooReader();
+      List<String> groups = zooReader.getChildren(compactorGroupsPath);
+      for (String group : groups) {
+        groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
+        try {
+          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
+          for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
+            if (!children.isEmpty()) {
+              LOG.trace("Found live compactor {} ", compactor);
+              return false;
+            }
+          }
+        } catch (NoNodeException e) {
+          LOG.trace("Ignoring node that went missing", e);
+        }
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static List<Path> findTempFiles(ServerContext context) throws InterruptedException {
+    final String pattern = "/tables/*/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files in volume: {} that match pattern: {}", vol, volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files =
+              vol.getFileSystem().globStatus(volPattern, (p) -> p.getName().contains("_tmp_ECID-"));
+          System.out.println(Arrays.toString(files));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Deleting compaction tmp files: {}", opts.delete);
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+      ServerContext context = opts.getServerContext();
+      if (!allCompactorsDown(context)) {
+        LOG.warn("Compactor addresses found in ZooKeeper. Unable to run this utility.");
+      }
+
+      final List<Path> matches = findTempFiles(context);
+      LOG.info("Found the following compaction tmp files:");
+      matches.forEach(p -> LOG.info("{}", p));
+

Review Comment:
   For filtering, could just use a scanner directly and only fetch the ECID column (don't need any other tablet columns like prev row, so don't need to filter at the row level for this case).  Would still need to use ample to read root table metadata.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396346643


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.net.HostAndPort;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+    @Parameter(names = "--delete")
+    boolean delete = false;
+  }
+
+  private static boolean allCompactorsDown(ClientContext context) {
+    // This is a copy of ExternalCompactionUtil.getCompactorAddrs that returns
+    // false if any compactor address is found. If there are no compactor addresses
+    // in any of the groups, then it returns true.
+    try {
+      final Map<String,List<HostAndPort>> groupsAndAddresses = new HashMap<>();
+      final String compactorGroupsPath = context.getZooKeeperRoot() + Constants.ZCOMPACTORS;
+      ZooReader zooReader = context.getZooReader();
+      List<String> groups = zooReader.getChildren(compactorGroupsPath);
+      for (String group : groups) {
+        groupsAndAddresses.putIfAbsent(group, new ArrayList<>());
+        try {
+          List<String> compactors = zooReader.getChildren(compactorGroupsPath + "/" + group);
+          for (String compactor : compactors) {
+            // compactor is the address, we are checking to see if there is a child node which
+            // represents the compactor's lock as a check that it's alive.
+            List<String> children =
+                zooReader.getChildren(compactorGroupsPath + "/" + group + "/" + compactor);
+            if (!children.isEmpty()) {
+              LOG.trace("Found live compactor {} ", compactor);
+              return false;
+            }
+          }
+        } catch (NoNodeException e) {
+          LOG.trace("Ignoring node that went missing", e);
+        }
+      }
+      return true;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public static List<Path> findTempFiles(ServerContext context) throws InterruptedException {
+    final String pattern = "/tables/*/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);
+    final List<Future<Void>> futures = new ArrayList<>(vols.size());
+    for (Volume vol : vols) {
+      final Path volPattern = new Path(vol.getBasePath() + pattern);
+      LOG.info("Looking for tmp files in volume: {} that match pattern: {}", vol, volPattern);
+      futures.add(svc.submit(() -> {
+        try {
+          FileStatus[] files =
+              vol.getFileSystem().globStatus(volPattern, (p) -> p.getName().contains("_tmp_ECID-"));
+          System.out.println(Arrays.toString(files));
+          Arrays.stream(files).forEach(fs -> matches.add(fs.getPath()));
+        } catch (IOException e) {
+          LOG.error("Error looking for tmp files in volume: {}", vol, e);
+        }
+        return null;
+      }));
+    }
+    svc.shutdown();
+
+    while (futures.size() > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Void>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Void> future = iter.next();
+        if (future.isDone()) {
+          iter.remove();
+          try {
+            future.get();
+          } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException("Error getting list of tmp files", e);
+          }
+        }
+      }
+    }
+    svc.awaitTermination(10, TimeUnit.MINUTES);
+    return matches;
+  }
+
+  public static class DeleteStats {
+    public int success = 0;
+    public int failure = 0;
+    public int error = 0;
+  }
+
+  public static DeleteStats deleteTempFiles(ServerContext context, List<Path> filesToDelete)
+      throws InterruptedException {
+
+    final ExecutorService delSvc = Executors.newFixedThreadPool(8);
+    final List<Future<Boolean>> futures = new ArrayList<>(filesToDelete.size());
+    final DeleteStats stats = new DeleteStats();
+
+    filesToDelete.forEach(p -> {
+      futures.add(delSvc.submit(() -> context.getVolumeManager().delete(p)));
+    });
+    delSvc.shutdown();
+
+    int expectedResponses = filesToDelete.size();
+    while (expectedResponses > 0) {
+      UtilWaitThread.sleep(10_000);
+      Iterator<Future<Boolean>> iter = futures.iterator();
+      while (iter.hasNext()) {
+        Future<Boolean> future = iter.next();
+        if (future.isDone()) {
+          expectedResponses--;
+          iter.remove();
+          try {
+            if (future.get()) {
+              stats.success++;
+            } else {
+              stats.failure++;
+            }
+          } catch (ExecutionException e) {
+            stats.error++;
+            LOG.error("Error deleting a compaction tmp file", e);
+          }
+        }
+      }
+      LOG.debug("Waiting on {} responses", expectedResponses);
+    }
+    delSvc.awaitTermination(10, TimeUnit.MINUTES);
+    return stats;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(FindCompactionTmpFiles.class.getName(), args);
+    LOG.info("Deleting compaction tmp files: {}", opts.delete);
+    Span span = TraceUtil.startSpan(FindCompactionTmpFiles.class, "main");
+    try (Scope scope = span.makeCurrent()) {
+      ServerContext context = opts.getServerContext();
+      if (!allCompactorsDown(context)) {
+        LOG.warn("Compactor addresses found in ZooKeeper. Unable to run this utility.");
+      }
+
+      final List<Path> matches = findTempFiles(context);
+      LOG.info("Found the following compaction tmp files:");
+      matches.forEach(p -> LOG.info("{}", p));
+

Review Comment:
   Interesting thought. I didn't consider running this periodically so that things were eventually clean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "keith-turner (via GitHub)" <gi...@apache.org>.
keith-turner commented on PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#issuecomment-1815323752

   > Modified TabletNameGenerator.getNextDataFilenameForMajc to add the external compaction id to the filename.
   
   Putting the ECID at the end of the tmp file name is a really neat solution to this problem.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1396342341


##########
core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionId.java:
##########
@@ -67,4 +67,10 @@ public static ExternalCompactionId from(String ecid) {
     return of(ecid);
   }
 
+  public String encodeForFileName() {
+    // A colon in the file name causes issues in HDFS. Use a different character
+    // that will not be URLEncoded
+    return canonical().replace(':', '-');

Review Comment:
   We could do that. We just need to use something that will work in Hadoop filenames *and* won't be changed during URLEncoding. For some reason URL encoding was happening twice, which led to issues. Changing the `:` to a `-` in the ECID Prefix should work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "EdColeman (via GitHub)" <gi...@apache.org>.
EdColeman commented on PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#issuecomment-1816864280

   With something that is not a "directory per table" - do we need to be concerned with the absolute number of files that may end up there?  Basically running out of indoes or the equivalent?  And / or are there performance considerations with large numbers of files in a single directory?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#issuecomment-1817093641

   > With something that is not a "directory per table" - do we need to be concerned with the absolute number of files that may end up there? Basically running out of indoes or the equivalent? And / or are there performance considerations with large numbers of files in a single directory?
   
   In the case where the value of the tmp dir is `file:///`, then I don't think so. The Compactor would be writing one file for the current compaction, and would clean it up before it starts the next compaction. I don't know if there is an inode limitation for HDFS or S3.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Cleanup failed compaction tmp files [accumulo]

Posted by "dlmarion (via GitHub)" <gi...@apache.org>.
dlmarion commented on code in PR #3955:
URL: https://github.com/apache/accumulo/pull/3955#discussion_r1400855215


##########
server/base/src/main/java/org/apache/accumulo/server/util/FindCompactionTmpFiles.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.volume.Volume;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.cli.ServerUtilOpts;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Scope;
+
+public class FindCompactionTmpFiles {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FindCompactionTmpFiles.class);
+
+  static class Opts extends ServerUtilOpts {
+
+    @Parameter(names = "--tables", description = "comma separated list of table names")
+    String tables;
+
+    @Parameter(names = "--delete", description = "if true, will delete tmp files")
+    boolean delete = false;
+  }
+
+  public static List<Path> findTempFiles(ServerContext context, String tableId)
+      throws InterruptedException {
+    String tablePattern = tableId != null ? tableId : "*";
+    final String pattern = "/tables/" + tablePattern + "/*/*";
+    final Collection<Volume> vols = context.getVolumeManager().getVolumes();
+    final ExecutorService svc = Executors.newFixedThreadPool(vols.size());
+    final List<Path> matches = new ArrayList<>(1024);

Review Comment:
   Made a ConcurrentSkipListSet in 9f4315a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org