You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/04/10 02:32:46 UTC
[accumulo] branch master updated: Fix #933 RuntimeException while
stopping Accumulo (#1034)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new e60f552 Fix #933 RuntimeException while stopping Accumulo (#1034)
e60f552 is described below
commit e60f5523b2335bfb9ff5edad3904f89a3e2e4383
Author: Jeffrey Zeiberg <jz...@gmail.com>
AuthorDate: Tue Apr 9 22:32:42 2019 -0400
Fix #933 RuntimeException while stopping Accumulo (#1034)
Update commons-vfs and manage vfs resources a bit better.
---
pom.xml | 2 +-
.../vfs/AccumuloReloadingVFSClassLoader.java | 71 +++++++++++++++++++---
2 files changed, 65 insertions(+), 8 deletions(-)
diff --git a/pom.xml b/pom.xml
index 1a59378..313577b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -431,7 +431,7 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<!-- commons-vfs2 version 2.2 has defects that impacts changing Accumulo classpath contexts. -->
- <version>2.1</version>
+ <version>2.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
diff --git a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java
index 0c72462..6dd5a6d 100644
--- a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java
+++ b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloReloadingVFSClassLoader.java
@@ -19,11 +19,14 @@ package org.apache.accumulo.start.classloader.vfs;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
import org.apache.commons.vfs2.FileChangeEvent;
import org.apache.commons.vfs2.FileListener;
@@ -100,12 +103,17 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC
}
}
+
// There is a chance that the listener was removed from the top level directory or
// its children if they were deleted within some time window. Re-add files to be
// monitored. The Monitor will ignore files that are already/still being monitored.
- for (FileObject file : files) {
- monitor.addFile(file);
- }
+ // forEachCatchRTEs will capture a stream of thrown exceptions.
+ // and can collect them to list or reduce into one exception
+
+ forEachCatchRTEs(Arrays.stream(files), o -> {
+ addFileToMonitor(o);
+ log.debug("monitoring {}", o);
+ });
}
log.debug("Rebuilding dynamic classloader using files- {}", stringify(files));
@@ -182,13 +190,56 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC
monitor = new DefaultFileMonitor(this);
monitor.setDelay(monitorDelay);
monitor.setRecursive(false);
- for (FileObject file : pathsToMonitor) {
- monitor.addFile(file);
- log.debug("monitoring {}", file);
- }
+
+ forEachCatchRTEs(pathsToMonitor.stream(), o -> {
+ addFileToMonitor(o);
+ log.debug("monitoring {}", o);
+ });
+
monitor.start();
}
+ private void addFileToMonitor(FileObject file) throws RuntimeException {
+ try {
+ if (monitor != null)
+ monitor.addFile(file);
+ } catch (RuntimeException re) {
+ if (re.getMessage().contains("files-cache"))
+ log.error("files-cache error adding {} to VFS monitor. "
+ + "There is no implementation for files-cache in VFS2", file, re);
+ else
+ log.error("Runtime error adding {} to VFS monitor", file, re);
+
+ throw re;
+ }
+ }
+
+ private void removeFile(FileObject file) throws RuntimeException {
+ try {
+ if (monitor != null)
+ monitor.removeFile(file);
+ } catch (RuntimeException re) {
+ log.error("Error removing file from VFS cache {}", file, re);
+ throw re;
+ }
+ }
+
+ public static <T> void forEachCatchRTEs(Stream<T> stream, Consumer<T> consumer) {
+ stream.flatMap(o -> {
+ try {
+ consumer.accept(o);
+ return null;
+ } catch (RuntimeException e) {
+ return Stream.of(e);
+ }
+ }).reduce((e1, e2) -> {
+ e1.addSuppressed(e2);
+ return e1;
+ }).ifPresent(e -> {
+ throw e;
+ });
+ }
+
public AccumuloReloadingVFSClassLoader(String uris, FileSystemManager vfs,
final ReloadingClassLoader parent, boolean preDelegate) throws FileSystemException {
this(uris, vfs, parent, DEFAULT_TIMEOUT, preDelegate);
@@ -199,6 +250,12 @@ public class AccumuloReloadingVFSClassLoader implements FileListener, ReloadingC
* thread
*/
public void close() {
+
+ forEachCatchRTEs(Stream.of(files), o -> {
+ removeFile(o);
+ log.debug("Removing file from monitoring {}", o);
+ });
+
executor.shutdownNow();
monitor.stop();
}