You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/04/10 02:32:46 UTC

[accumulo] branch master updated: Fix #933 RuntimeException while stopping Accumulo (#1034)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new e60f552  Fix #933 RuntimeException while stopping Accumulo (#1034)
e60f552 is described below

commit e60f5523b2335bfb9ff5edad3904f89a3e2e4383
Author: Jeffrey Zeiberg <jz...@gmail.com>
AuthorDate: Tue Apr 9 22:32:42 2019 -0400

    Fix #933 RuntimeException while stopping Accumulo (#1034)
    
    Update commons-vfs and manage vfs resources a bit better.
---
 pom.xml                                            |  2 +-
 .../vfs/AccumuloReloadingVFSClassLoader.java       | 71 +++++++++++++++++++---
 2 files changed, 65 insertions(+), 8 deletions(-)

diff --git a/pom.xml b/pom.xml
index 1a59378..313577b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -431,7 +431,7 @@
         <groupId>org.apache.commons</groupId>
         <artifactId>commons-vfs2</artifactId>
         <!-- commons-vfs2 version 2.2 has defects that impacts changing Accumulo classpath contexts. -->
-        <version>2.1</version>
+        <version>2.3</version>
       </dependency>
       <dependency>
         <groupId>org.apache.curator</groupId>
diff --git a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java
index 0c72462..6dd5a6d 100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java
@@ -19,11 +19,14 @@ package org.apache.accumulo.start.classloader.vfs;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
 
 import org.apache.commons.vfs2.FileChangeEvent;
 import org.apache.commons.vfs2.FileListener;
@@ -100,12 +103,17 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC
               }
 
             }
+
             // There is a chance that the listener was removed from the top level directory or
             // its children if they were deleted within some time window. Re-add files to be
             // monitored. The Monitor will ignore files that are already/still being monitored.
-            for (FileObject file : files) {
-              monitor.addFile(file);
-            }
+            // forEachCatchRTEs will capture a stream of thrown exceptions.
+            // and can collect them to list or reduce into one exception
+
+            forEachCatchRTEs(Arrays.stream(files), o -> {
+              addFileToMonitor(o);
+              log.debug("monitoring {}", o);
+            });
           }
 
           log.debug("Rebuilding dynamic classloader using files- {}", stringify(files));
@@ -182,13 +190,56 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC
     monitor = new DefaultFileMonitor(this);
     monitor.setDelay(monitorDelay);
     monitor.setRecursive(false);
-    for (FileObject file : pathsToMonitor) {
-      monitor.addFile(file);
-      log.debug("monitoring {}", file);
-    }
+
+    forEachCatchRTEs(pathsToMonitor.stream(), o -> {
+      addFileToMonitor(o);
+      log.debug("monitoring {}", o);
+    });
+
     monitor.start();
   }
 
+  private void addFileToMonitor(FileObject file) throws RuntimeException {
+    try {
+      if (monitor != null)
+        monitor.addFile(file);
+    } catch (RuntimeException re) {
+      if (re.getMessage().contains("files-cache"))
+        log.error("files-cache error adding {} to VFS monitor. "
+            + "There is no implementation for files-cache in VFS2", file, re);
+      else
+        log.error("Runtime error adding {} to VFS monitor", file, re);
+
+      throw re;
+    }
+  }
+
+  private void removeFile(FileObject file) throws RuntimeException {
+    try {
+      if (monitor != null)
+        monitor.removeFile(file);
+    } catch (RuntimeException re) {
+      log.error("Error removing file from VFS cache {}", file, re);
+      throw re;
+    }
+  }
+
+  public static <T> void forEachCatchRTEs(Stream<T> stream, Consumer<T> consumer) {
+    stream.flatMap(o -> {
+      try {
+        consumer.accept(o);
+        return null;
+      } catch (RuntimeException e) {
+        return Stream.of(e);
+      }
+    }).reduce((e1, e2) -> {
+      e1.addSuppressed(e2);
+      return e1;
+    }).ifPresent(e -> {
+      throw e;
+    });
+  }
+
   public AccumuloReloadingVFSClassLoader(String uris, FileSystemManager vfs,
       final ReloadingClassLoader parent, boolean preDelegate) throws FileSystemException {
     this(uris, vfs, parent, DEFAULT_TIMEOUT, preDelegate);
@@ -199,6 +250,12 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC
    * thread
    */
   public void close() {
+
+    forEachCatchRTEs(Stream.of(files), o -> {
+      removeFile(o);
+      log.debug("Removing file from monitoring {}", o);
+    });
+
     executor.shutdownNow();
     monitor.stop();
   }