You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/07/04 16:55:16 UTC
[2/6] nifi-minifi git commit: MINIFI-38 Removing reliance on
JettyServer in order to add a flow status reporting end point. This also
removes the UI and adds a 'flowstatus' option to minifi.sh to get information
on the current flow from the terminal.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
new file mode 100644
index 0000000..a4525b0
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoaders.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.jar.Attributes;
+import java.util.jar.Manifest;
+import org.apache.nifi.util.FileUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public final class NarClassLoaders {
+
+ public static final String FRAMEWORK_NAR_ID = "minifi-framework-nar";
+
+ private static final Logger logger = LoggerFactory.getLogger(NarClassLoaders.class);
+ private static final AtomicBoolean initialized = new AtomicBoolean(false);
+ private static final AtomicReference<Map<String, ClassLoader>> extensionClassLoaders = new AtomicReference<>();
+ private static final AtomicReference<ClassLoader> frameworkClassLoader = new AtomicReference<>();
+
+ /**
+ * Loads the extensions class loaders from the specified working directory.
+ * Loading is only performed during the initial invocation of load.
+ * Subsequent attempts will be ignored.
+ *
+ *
+ * @param properties properties object to initialize with
+ * @throws java.io.IOException ioe
+ * @throws java.lang.ClassNotFoundException cfne
+ * @throws IllegalStateException if the class loaders have already been
+ * created
+ */
+ public static void load(final NiFiProperties properties) throws IOException, ClassNotFoundException {
+ if (initialized.getAndSet(true)) {
+ throw new IllegalStateException("Extensions class loaders have already been loaded.");
+ }
+
+ // get the system classloader
+ final ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
+
+ // get the current context class loader
+ ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
+
+ // find all nar files and create class loaders for them.
+ final Map<String, ClassLoader> extensionDirectoryClassLoaderLookup = new LinkedHashMap<>();
+ final Map<String, ClassLoader> narIdClassLoaderLookup = new HashMap<>();
+
+ final File frameworkWorkingDirectory = properties.getFrameworkWorkingDirectory();
+ final File extensionsWorkingDirectory = properties.getExtensionsWorkingDirectory();
+
+ // make sure the nar directory is there and accessible
+ FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDirectory);
+ FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDirectory);
+
+ final List<File> narWorkingDirContents = new ArrayList<>();
+ final File[] frameworkWorkingDirContents = frameworkWorkingDirectory.listFiles();
+ if (frameworkWorkingDirContents != null) {
+ narWorkingDirContents.addAll(Arrays.asList(frameworkWorkingDirContents));
+ }
+ final File[] extensionsWorkingDirContents = extensionsWorkingDirectory.listFiles();
+ if (extensionsWorkingDirContents != null) {
+ narWorkingDirContents.addAll(Arrays.asList(extensionsWorkingDirContents));
+ }
+
+ if (!narWorkingDirContents.isEmpty()) {
+ final List<NarDetails> narDetails = new ArrayList<>();
+
+ // load the nar details which includes and nar dependencies
+ for (final File unpackedNar : narWorkingDirContents) {
+ final NarDetails narDetail = getNarDetails(unpackedNar);
+
+ // ensure the nar contained an identifier
+ if (narDetail.getNarId() == null) {
+ logger.warn("No NAR Id found. Skipping: " + unpackedNar.getAbsolutePath());
+ continue;
+ }
+
+ // store the nar details
+ narDetails.add(narDetail);
+ }
+
+ int narCount;
+ do {
+ // record the number of nars to be loaded
+ narCount = narDetails.size();
+
+ // attempt to create each nar class loader
+ for (final Iterator<NarDetails> narDetailsIter = narDetails.iterator(); narDetailsIter.hasNext();) {
+ final NarDetails narDetail = narDetailsIter.next();
+ final String narDependencies = narDetail.getNarDependencyId();
+
+ // see if this class loader is eligible for loading
+ ClassLoader narClassLoader = null;
+ if (narDependencies == null) {
+ narClassLoader = createNarClassLoader(narDetail.getNarWorkingDirectory(), currentContextClassLoader);
+ } else if (narIdClassLoaderLookup.containsKey(narDetail.getNarDependencyId())) {
+ narClassLoader = createNarClassLoader(narDetail.getNarWorkingDirectory(), narIdClassLoaderLookup.get(narDetail.getNarDependencyId()));
+ }
+
+ // if we were able to create the nar class loader, store it and remove the details
+ if (narClassLoader != null) {
+ extensionDirectoryClassLoaderLookup.put(narDetail.getNarWorkingDirectory().getCanonicalPath(), narClassLoader);
+ narIdClassLoaderLookup.put(narDetail.getNarId(), narClassLoader);
+ narDetailsIter.remove();
+ }
+ }
+
+ // attempt to load more if some were successfully loaded this iteration
+ } while (narCount != narDetails.size());
+
+ // see if any nars couldn't be loaded
+ for (final NarDetails narDetail : narDetails) {
+ logger.warn(String.format("Unable to resolve required dependency '%s'. Skipping NAR %s", narDetail.getNarDependencyId(), narDetail.getNarWorkingDirectory().getAbsolutePath()));
+ }
+ }
+
+ // set the framework class loader
+ frameworkClassLoader.set(narIdClassLoaderLookup.get(FRAMEWORK_NAR_ID));
+
+ // set the extensions class loader map
+ extensionClassLoaders.set(new LinkedHashMap<>(extensionDirectoryClassLoaderLookup));
+ }
+
+ /**
+ * Creates a new NarClassLoader. The parentClassLoader may be null.
+ *
+ * @param narDirectory root directory of nar
+ * @param parentClassLoader parent classloader of nar
+ * @return the nar classloader
+ * @throws IOException ioe
+ * @throws ClassNotFoundException cfne
+ */
+ private static ClassLoader createNarClassLoader(final File narDirectory, final ClassLoader parentClassLoader) throws IOException, ClassNotFoundException {
+ logger.debug("Loading NAR file: " + narDirectory.getAbsolutePath());
+ final ClassLoader narClassLoader = new NarClassLoader(narDirectory, parentClassLoader);
+ logger.info("Loaded NAR file: " + narDirectory.getAbsolutePath() + " as class loader " + narClassLoader);
+ return narClassLoader;
+ }
+
+ /**
+ * Loads the details for the specified NAR. The details will be extracted
+ * from the manifest file.
+ *
+ * @param narDirectory the nar directory
+ * @return details about the NAR
+ * @throws IOException ioe
+ */
+ private static NarDetails getNarDetails(final File narDirectory) throws IOException {
+ final NarDetails narDetails = new NarDetails();
+ narDetails.setNarWorkingDirectory(narDirectory);
+
+ final File manifestFile = new File(narDirectory, "META-INF/MANIFEST.MF");
+ try (final FileInputStream fis = new FileInputStream(manifestFile)) {
+ final Manifest manifest = new Manifest(fis);
+ final Attributes attributes = manifest.getMainAttributes();
+
+ // get the nar details
+ narDetails.setNarId(attributes.getValue("Nar-Id"));
+ narDetails.setNarDependencyId(attributes.getValue("Nar-Dependency-Id"));
+ }
+
+ return narDetails;
+ }
+
+ /**
+ * @return the framework class loader
+ *
+ * @throws IllegalStateException if the frame class loader has not been
+ * loaded
+ */
+ public static ClassLoader getFrameworkClassLoader() {
+ if (!initialized.get()) {
+ throw new IllegalStateException("Framework class loader has not been loaded.");
+ }
+
+ return frameworkClassLoader.get();
+ }
+
+ /**
+ * @param extensionWorkingDirectory the directory
+ * @return the class loader for the specified working directory. Returns
+ * null when no class loader exists for the specified working directory
+ * @throws IllegalStateException if the class loaders have not been loaded
+ */
+ public static ClassLoader getExtensionClassLoader(final File extensionWorkingDirectory) {
+ if (!initialized.get()) {
+ throw new IllegalStateException("Extensions class loaders have not been loaded.");
+ }
+
+ try {
+ return extensionClassLoaders.get().get(extensionWorkingDirectory.getCanonicalPath());
+ } catch (final IOException ioe) {
+ return null;
+ }
+ }
+
+ /**
+ * @return the extension class loaders
+ * @throws IllegalStateException if the class loaders have not been loaded
+ */
+ public static Set<ClassLoader> getExtensionClassLoaders() {
+ if (!initialized.get()) {
+ throw new IllegalStateException("Extensions class loaders have not been loaded.");
+ }
+
+ return new LinkedHashSet<>(extensionClassLoaders.get().values());
+ }
+
+ private static class NarDetails {
+
+ private String narId;
+ private String narDependencyId;
+ private File narWorkingDirectory;
+
+ public String getNarDependencyId() {
+ return narDependencyId;
+ }
+
+ public void setNarDependencyId(String narDependencyId) {
+ this.narDependencyId = narDependencyId;
+ }
+
+ public String getNarId() {
+ return narId;
+ }
+
+ public void setNarId(String narId) {
+ this.narId = narId;
+ }
+
+ public File getNarWorkingDirectory() {
+ return narWorkingDirectory;
+ }
+
+ public void setNarWorkingDirectory(File narWorkingDirectory) {
+ this.narWorkingDirectory = narWorkingDirectory;
+ }
+ }
+
+ private NarClassLoaders() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
new file mode 100644
index 0000000..c0b43dc
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarCloseable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import java.io.Closeable;
+
+/**
+ *
+ */
+public class NarCloseable implements Closeable {
+
+ public static org.apache.nifi.nar.NarCloseable withNarLoader() {
+ final ClassLoader current = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance());
+ return new org.apache.nifi.nar.NarCloseable(current);
+ }
+
+ private final ClassLoader toSet;
+
+ private NarCloseable(final ClassLoader toSet) {
+ this.toSet = toSet;
+ }
+
+ @Override
+ public void close() {
+ if (toSet != null) {
+ Thread.currentThread().setContextClassLoader(toSet);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
new file mode 100644
index 0000000..c3b17fd
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import org.apache.nifi.authentication.LoginIdentityProvider;
+import org.apache.nifi.authorization.AuthorityProvider;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.reporting.ReportingTask;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.List;
+
+/**
+ * THREAD SAFE
+ */
+public class NarThreadContextClassLoader extends URLClassLoader {
+
+ static final ContextSecurityManager contextSecurityManager = new ContextSecurityManager();
+ private final ClassLoader forward = ClassLoader.getSystemClassLoader();
+ private static final List<Class<?>> narSpecificClasses = new ArrayList<>();
+
+ static {
+ narSpecificClasses.add(Processor.class);
+ narSpecificClasses.add(FlowFilePrioritizer.class);
+ narSpecificClasses.add(ReportingTask.class);
+ narSpecificClasses.add(Validator.class);
+ narSpecificClasses.add(InputStreamCallback.class);
+ narSpecificClasses.add(OutputStreamCallback.class);
+ narSpecificClasses.add(StreamCallback.class);
+ narSpecificClasses.add(ControllerService.class);
+ narSpecificClasses.add(AuthorityProvider.class);
+ narSpecificClasses.add(LoginIdentityProvider.class);
+ narSpecificClasses.add(ProvenanceEventRepository.class);
+ narSpecificClasses.add(ComponentStatusRepository.class);
+ narSpecificClasses.add(FlowFileRepository.class);
+ narSpecificClasses.add(FlowFileSwapManager.class);
+ narSpecificClasses.add(ContentRepository.class);
+ }
+
+ private NarThreadContextClassLoader() {
+ super(new URL[0]);
+ }
+
+ @Override
+ public void clearAssertionStatus() {
+ lookupClassLoader().clearAssertionStatus();
+ }
+
+ @Override
+ public URL getResource(String name) {
+ return lookupClassLoader().getResource(name);
+ }
+
+ @Override
+ public InputStream getResourceAsStream(String name) {
+ return lookupClassLoader().getResourceAsStream(name);
+ }
+
+ @Override
+ public Enumeration<URL> getResources(String name) throws IOException {
+ return lookupClassLoader().getResources(name);
+ }
+
+ @Override
+ public Class<?> loadClass(String name) throws ClassNotFoundException {
+ return lookupClassLoader().loadClass(name);
+ }
+
+ @Override
+ public void setClassAssertionStatus(String className, boolean enabled) {
+ lookupClassLoader().setClassAssertionStatus(className, enabled);
+ }
+
+ @Override
+ public void setDefaultAssertionStatus(boolean enabled) {
+ lookupClassLoader().setDefaultAssertionStatus(enabled);
+ }
+
+ @Override
+ public void setPackageAssertionStatus(String packageName, boolean enabled) {
+ lookupClassLoader().setPackageAssertionStatus(packageName, enabled);
+ }
+
+ private ClassLoader lookupClassLoader() {
+ final Class<?>[] classStack = contextSecurityManager.getExecutionStack();
+
+ for (Class<?> currentClass : classStack) {
+ final Class<?> narClass = findNarClass(currentClass);
+ if (narClass != null) {
+ final ClassLoader desiredClassLoader = narClass.getClassLoader();
+
+ // When new Threads are created, the new Thread inherits the ClassLoaderContext of
+ // the caller. However, the call stack of that new Thread may not trace back to any NiFi-specific
+ // code. Therefore, the NarThreadContextClassLoader will be unable to find the appropriate NAR
+ // ClassLoader. As a result, we want to set the ContextClassLoader to the NAR ClassLoader that
+ // contains the class or resource that we are looking for.
+ // This locks the current Thread into the appropriate NAR ClassLoader Context. The framework will change
+ // the ContextClassLoader back to the NarThreadContextClassLoader as appropriate via the
+ // {@link FlowEngine.beforeExecute(Thread, Runnable)} and
+ // {@link FlowEngine.afterExecute(Thread, Runnable)} methods.
+ if (desiredClassLoader instanceof NarClassLoader) {
+ Thread.currentThread().setContextClassLoader(desiredClassLoader);
+ }
+ return desiredClassLoader;
+ }
+ }
+ return forward;
+ }
+
+ private Class<?> findNarClass(final Class<?> cls) {
+ for (final Class<?> narClass : narSpecificClasses) {
+ if (narClass.isAssignableFrom(cls)) {
+ return cls;
+ } else if (cls.getEnclosingClass() != null) {
+ return findNarClass(cls.getEnclosingClass());
+ }
+ }
+
+ return null;
+ }
+
+ private static class SingletonHolder {
+
+ public static final NarThreadContextClassLoader instance = new NarThreadContextClassLoader();
+ }
+
+ public static NarThreadContextClassLoader getInstance() {
+ return SingletonHolder.instance;
+ }
+
+ static class ContextSecurityManager extends SecurityManager {
+
+ Class<?>[] getExecutionStack() {
+ return getClassContext();
+ }
+ }
+
+ public static <T> T createInstance(final String implementationClassName, final Class<T> typeDefinition) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
+ final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(NarThreadContextClassLoader.getInstance());
+ try {
+ final ClassLoader detectedClassLoaderForType = ExtensionManager.getClassLoader(implementationClassName);
+ final Class<?> rawClass;
+ if (detectedClassLoaderForType == null) {
+ // try to find from the current class loader
+ rawClass = Class.forName(implementationClassName);
+ } else {
+ // try to find from the registered classloader for that type
+ rawClass = Class.forName(implementationClassName, true, ExtensionManager.getClassLoader(implementationClassName));
+ }
+
+ Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
+ final Class<?> desiredClass = rawClass.asSubclass(typeDefinition);
+ return typeDefinition.cast(desiredClass.newInstance());
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
new file mode 100644
index 0000000..2af1090
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/nar/NarUnpacker.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import org.apache.nifi.util.FileUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.jar.Attributes;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+/**
+ *
+ */
+public final class NarUnpacker {
+
+ private static final Logger logger = LoggerFactory.getLogger(org.apache.nifi.nar.NarUnpacker.class);
+ private static String HASH_FILENAME = "nar-md5sum";
+ private static final FileFilter NAR_FILTER = new FileFilter() {
+ @Override
+ public boolean accept(File pathname) {
+ final String nameToTest = pathname.getName().toLowerCase();
+ return nameToTest.endsWith(".nar") && pathname.isFile();
+ }
+ };
+
+ public static ExtensionMapping unpackNars(final NiFiProperties props) {
+ final List<Path> narLibraryDirs = props.getNarLibraryDirectories();
+ final File frameworkWorkingDir = props.getFrameworkWorkingDirectory();
+ final File extensionsWorkingDir = props.getExtensionsWorkingDirectory();
+ final File docsWorkingDir = props.getComponentDocumentationWorkingDirectory();
+
+ try {
+ File unpackedFramework = null;
+ final Set<File> unpackedExtensions = new HashSet<>();
+ final List<File> narFiles = new ArrayList<>();
+
+ // make sure the nar directories are there and accessible
+ FileUtils.ensureDirectoryExistAndCanAccess(frameworkWorkingDir);
+ FileUtils.ensureDirectoryExistAndCanAccess(extensionsWorkingDir);
+ FileUtils.ensureDirectoryExistAndCanAccess(docsWorkingDir);
+
+ for (Path narLibraryDir : narLibraryDirs) {
+
+ File narDir = narLibraryDir.toFile();
+ FileUtils.ensureDirectoryExistAndCanAccess(narDir);
+
+ File[] dirFiles = narDir.listFiles(NAR_FILTER);
+ if (dirFiles != null) {
+ List<File> fileList = Arrays.asList(dirFiles);
+ narFiles.addAll(fileList);
+ }
+ }
+
+ if (!narFiles.isEmpty()) {
+ for (File narFile : narFiles) {
+ logger.debug("Expanding NAR file: " + narFile.getAbsolutePath());
+
+ // get the manifest for this nar
+ try (final JarFile nar = new JarFile(narFile)) {
+ final Manifest manifest = nar.getManifest();
+
+ // lookup the nar id
+ final Attributes attributes = manifest.getMainAttributes();
+ final String narId = attributes.getValue("Nar-Id");
+
+ // determine if this is the framework
+ if (NarClassLoaders.FRAMEWORK_NAR_ID.equals(narId)) {
+ if (unpackedFramework != null) {
+ throw new IllegalStateException(
+ "Multiple framework NARs discovered. Only one framework is permitted.");
+ }
+
+ unpackedFramework = unpackNar(narFile, frameworkWorkingDir);
+ } else {
+ unpackedExtensions.add(unpackNar(narFile, extensionsWorkingDir));
+ }
+ }
+ }
+
+ // ensure we've found the framework nar
+ if (unpackedFramework == null) {
+ throw new IllegalStateException("No framework NAR found.");
+ } else if (!unpackedFramework.canRead()) {
+ throw new IllegalStateException("Framework NAR cannot be read.");
+ }
+
+ // Determine if any nars no longer exist and delete their
+ // working directories. This happens
+ // if a new version of a nar is dropped into the lib dir.
+ // ensure no old framework are present
+ final File[] frameworkWorkingDirContents = frameworkWorkingDir.listFiles();
+ if (frameworkWorkingDirContents != null) {
+ for (final File unpackedNar : frameworkWorkingDirContents) {
+ if (!unpackedFramework.equals(unpackedNar)) {
+ FileUtils.deleteFile(unpackedNar, true);
+ }
+ }
+ }
+
+ // ensure no old extensions are present
+ final File[] extensionsWorkingDirContents = extensionsWorkingDir.listFiles();
+ if (extensionsWorkingDirContents != null) {
+ for (final File unpackedNar : extensionsWorkingDirContents) {
+ if (!unpackedExtensions.contains(unpackedNar)) {
+ FileUtils.deleteFile(unpackedNar, true);
+ }
+ }
+ }
+ }
+
+ // attempt to delete any docs files that exist so that any
+ // components that have been removed
+ // will no longer have entries in the docs folder
+ final File[] docsFiles = docsWorkingDir.listFiles();
+ if (docsFiles != null) {
+ for (final File file : docsFiles) {
+ FileUtils.deleteFile(file, true);
+ }
+ }
+
+ final ExtensionMapping extensionMapping = new ExtensionMapping();
+ mapExtensions(extensionsWorkingDir, docsWorkingDir, extensionMapping);
+ return extensionMapping;
+ } catch (IOException e) {
+ logger.warn("Unable to load NAR library bundles due to " + e
+ + " Will proceed without loading any further Nar bundles");
+ if (logger.isDebugEnabled()) {
+ logger.warn("", e);
+ }
+ }
+
+ return null;
+ }
+
+ private static void mapExtensions(final File workingDirectory, final File docsDirectory,
+ final ExtensionMapping mapping) throws IOException {
+ final File[] directoryContents = workingDirectory.listFiles();
+ if (directoryContents != null) {
+ for (final File file : directoryContents) {
+ if (file.isDirectory()) {
+ mapExtensions(file, docsDirectory, mapping);
+ } else if (file.getName().toLowerCase().endsWith(".jar")) {
+ unpackDocumentation(file, docsDirectory, mapping);
+ }
+ }
+ }
+ }
+
+ /**
+ * Unpacks the specified nar into the specified base working directory.
+ *
+ * @param nar
+ * the nar to unpack
+ * @param baseWorkingDirectory
+ * the directory to unpack to
+ * @return the directory to the unpacked NAR
+ * @throws IOException
+ * if unable to explode nar
+ */
+ private static File unpackNar(final File nar, final File baseWorkingDirectory)
+ throws IOException {
+ final File narWorkingDirectory = new File(baseWorkingDirectory, nar.getName() + "-unpacked");
+
+ // if the working directory doesn't exist, unpack the nar
+ if (!narWorkingDirectory.exists()) {
+ unpack(nar, narWorkingDirectory, calculateMd5sum(nar));
+ } else {
+ // the working directory does exist. Run MD5 sum against the nar
+ // file and check if the nar has changed since it was deployed.
+ final byte[] narMd5 = calculateMd5sum(nar);
+ final File workingHashFile = new File(narWorkingDirectory, HASH_FILENAME);
+ if (!workingHashFile.exists()) {
+ FileUtils.deleteFile(narWorkingDirectory, true);
+ unpack(nar, narWorkingDirectory, narMd5);
+ } else {
+ final byte[] hashFileContents = Files.readAllBytes(workingHashFile.toPath());
+ if (!Arrays.equals(hashFileContents, narMd5)) {
+ logger.info("Contents of nar {} have changed. Reloading.",
+ new Object[] { nar.getAbsolutePath() });
+ FileUtils.deleteFile(narWorkingDirectory, true);
+ unpack(nar, narWorkingDirectory, narMd5);
+ }
+ }
+ }
+
+ return narWorkingDirectory;
+ }
+
+ /**
+ * Unpacks the NAR to the specified directory. Creates a checksum file that
+ * used to determine if future expansion is necessary.
+ *
+ * @param workingDirectory
+ * the root directory to which the NAR should be unpacked.
+ * @throws IOException
+ * if the NAR could not be unpacked.
+ */
+ private static void unpack(final File nar, final File workingDirectory, final byte[] hash)
+ throws IOException {
+
+ try (JarFile jarFile = new JarFile(nar)) {
+ Enumeration<JarEntry> jarEntries = jarFile.entries();
+ while (jarEntries.hasMoreElements()) {
+ JarEntry jarEntry = jarEntries.nextElement();
+ String name = jarEntry.getName();
+ File f = new File(workingDirectory, name);
+ if (jarEntry.isDirectory()) {
+ FileUtils.ensureDirectoryExistAndCanAccess(f);
+ } else {
+ makeFile(jarFile.getInputStream(jarEntry), f);
+ }
+ }
+ }
+
+ final File hashFile = new File(workingDirectory, HASH_FILENAME);
+ try (final FileOutputStream fos = new FileOutputStream(hashFile)) {
+ fos.write(hash);
+ }
+ }
+
+ private static void unpackDocumentation(final File jar, final File docsDirectory,
+ final ExtensionMapping extensionMapping) throws IOException {
+ // determine the components that may have documentation
+ determineDocumentedNiFiComponents(jar, extensionMapping);
+
+ // look for all documentation related to each component
+ try (final JarFile jarFile = new JarFile(jar)) {
+ for (final String componentName : extensionMapping.getAllExtensionNames()) {
+ final String entryName = "docs/" + componentName;
+
+ // go through each entry in this jar
+ for (final Enumeration<JarEntry> jarEnumeration = jarFile.entries(); jarEnumeration
+ .hasMoreElements();) {
+ final JarEntry jarEntry = jarEnumeration.nextElement();
+
+ // if this entry is documentation for this component
+ if (jarEntry.getName().startsWith(entryName)) {
+ final String name = StringUtils.substringAfter(jarEntry.getName(), "docs/");
+
+ // if this is a directory create it
+ if (jarEntry.isDirectory()) {
+ final File componentDocsDirectory = new File(docsDirectory, name);
+
+ // ensure the documentation directory can be created
+ if (!componentDocsDirectory.exists()
+ && !componentDocsDirectory.mkdirs()) {
+ logger.warn("Unable to create docs directory "
+ + componentDocsDirectory.getAbsolutePath());
+ break;
+ }
+ } else {
+ // if this is a file, write to it
+ final File componentDoc = new File(docsDirectory, name);
+ makeFile(jarFile.getInputStream(jarEntry), componentDoc);
+ }
+ }
+ }
+
+ }
+ }
+ }
+
+ private static void determineDocumentedNiFiComponents(final File jar,
+ final ExtensionMapping extensionMapping) throws IOException {
+ try (final JarFile jarFile = new JarFile(jar)) {
+ final JarEntry processorEntry = jarFile
+ .getJarEntry("META-INF/services/org.apache.nifi.processor.Processor");
+ final JarEntry reportingTaskEntry = jarFile
+ .getJarEntry("META-INF/services/org.apache.nifi.reporting.ReportingTask");
+ final JarEntry controllerServiceEntry = jarFile
+ .getJarEntry("META-INF/services/org.apache.nifi.controller.ControllerService");
+
+ extensionMapping.addAllProcessors(determineDocumentedNiFiComponents(jarFile,
+ processorEntry));
+ extensionMapping.addAllReportingTasks(determineDocumentedNiFiComponents(jarFile,
+ reportingTaskEntry));
+ extensionMapping.addAllControllerServices(determineDocumentedNiFiComponents(jarFile,
+ controllerServiceEntry));
+ }
+ }
+
+ private static List<String> determineDocumentedNiFiComponents(final JarFile jarFile,
+ final JarEntry jarEntry) throws IOException {
+ final List<String> componentNames = new ArrayList<>();
+
+ if (jarEntry == null) {
+ return componentNames;
+ }
+
+ try (final InputStream entryInputStream = jarFile.getInputStream(jarEntry);
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(
+ entryInputStream))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ final String trimmedLine = line.trim();
+ if (!trimmedLine.isEmpty() && !trimmedLine.startsWith("#")) {
+ final int indexOfPound = trimmedLine.indexOf("#");
+ final String effectiveLine = (indexOfPound > 0) ? trimmedLine.substring(0,
+ indexOfPound) : trimmedLine;
+ componentNames.add(effectiveLine);
+ }
+ }
+ }
+
+ return componentNames;
+ }
+
+ /**
+ * Creates the specified file, whose contents will come from the
+ * <tt>InputStream</tt>.
+ *
+ * @param inputStream
+ * the contents of the file to create.
+ * @param file
+ * the file to create.
+ * @throws IOException
+ * if the file could not be created.
+ */
+ private static void makeFile(final InputStream inputStream, final File file) throws IOException {
+ try (final InputStream in = inputStream;
+ final FileOutputStream fos = new FileOutputStream(file)) {
+ byte[] bytes = new byte[65536];
+ int numRead;
+ while ((numRead = in.read(bytes)) != -1) {
+ fos.write(bytes, 0, numRead);
+ }
+ }
+ }
+
+ /**
+ * Calculates an md5 sum of the specified file.
+ *
+ * @param file
+ * to calculate the md5sum of
+ * @return the md5sum bytes
+ * @throws IOException
+ * if cannot read file
+ */
+ private static byte[] calculateMd5sum(final File file) throws IOException {
+ try (final FileInputStream inputStream = new FileInputStream(file)) {
+ final MessageDigest md5 = MessageDigest.getInstance("md5");
+
+ final byte[] buffer = new byte[1024];
+ int read = inputStream.read(buffer);
+
+ while (read > -1) {
+ md5.update(buffer, 0, read);
+ read = inputStream.read(buffer);
+ }
+
+ return md5.digest();
+ } catch (NoSuchAlgorithmException nsae) {
+ throw new IllegalArgumentException(nsae);
+ }
+ }
+
+ private NarUnpacker() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java
new file mode 100644
index 0000000..5462f23
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/main/java/org/apache/nifi/util/FileUtils.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * A utility class containing a few useful static methods to do typical IO
+ * operations.
+ *
+ */
+public class FileUtils {
+
+ public static final long MILLIS_BETWEEN_ATTEMPTS = 50L;
+
+ public static void ensureDirectoryExistAndCanAccess(final File dir) throws IOException {
+ if (dir.exists() && !dir.isDirectory()) {
+ throw new IOException(dir.getAbsolutePath() + " is not a directory");
+ } else if (!dir.exists()) {
+ final boolean made = dir.mkdirs();
+ if (!made) {
+ throw new IOException(dir.getAbsolutePath() + " could not be created");
+ }
+ }
+ if (!(dir.canRead() && dir.canWrite())) {
+ throw new IOException(dir.getAbsolutePath() + " directory does not have read/write privilege");
+ }
+ }
+
+ /**
+ * Deletes the given file. If the given file exists but could not be deleted
+ * this will be printed as a warning to the given logger
+ *
+ * @param file to delete
+ * @param logger to notify
+ * @return true if deleted
+ */
+ public static boolean deleteFile(final File file, final Logger logger) {
+ return FileUtils.deleteFile(file, logger, 1);
+ }
+
+ /**
+ * Deletes the given file. If the given file exists but could not be deleted
+ * this will be printed as a warning to the given logger
+ *
+ * @param file to delete
+ * @param logger to notify
+ * @param attempts indicates how many times an attempt to delete should be
+ * made
+ * @return true if given file no longer exists
+ */
+ public static boolean deleteFile(final File file, final Logger logger, final int attempts) {
+ if (file == null) {
+ return false;
+ }
+ boolean isGone = false;
+ try {
+ if (file.exists()) {
+ final int effectiveAttempts = Math.max(1, attempts);
+ for (int i = 0; i < effectiveAttempts && !isGone; i++) {
+ isGone = file.delete() || !file.exists();
+ if (!isGone && (effectiveAttempts - i) > 1) {
+ FileUtils.sleepQuietly(MILLIS_BETWEEN_ATTEMPTS);
+ }
+ }
+ if (!isGone && logger != null) {
+ logger.warn("File appears to exist but unable to delete file: " + file.getAbsolutePath());
+ }
+ }
+ } catch (final Throwable t) {
+ if (logger != null) {
+ logger.warn("Unable to delete file: '" + file.getAbsolutePath() + "' due to " + t);
+ }
+ }
+ return isGone;
+ }
+
+ /**
+ * Deletes all files (not directories..) in the given directory (non
+ * recursive) that match the given filename filter. If any file cannot be
+ * deleted then this is printed at warn to the given logger.
+ *
+ * @param directory to delete contents of
+ * @param filter if null then no filter is used
+ * @param logger to notify
+ * @deprecated As of release 0.6.0, replaced by
+ * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger)}
+ */
+ @Deprecated
+ public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger) {
+ FileUtils.deleteFilesInDir(directory, filter, logger, false);
+ }
+
+ /**
+ * Deletes all files (not directories) in the given directory (recursive)
+ * that match the given filename filter. If any file cannot be deleted then
+ * this is printed at warn to the given logger.
+ *
+ * @param directory to delete contents of
+ * @param filter if null then no filter is used
+ * @param logger to notify
+ * @param recurse true if should recurse
+ * @deprecated As of release 0.6.0, replaced by
+ * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger, boolean)}
+ */
+ @Deprecated
+ public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) {
+ FileUtils.deleteFilesInDir(directory, filter, logger, recurse, false);
+ }
+
+ /**
+ * Deletes all files (not directories) in the given directory (recursive)
+ * that match the given filename filter. If any file cannot be deleted then
+ * this is printed at warn to the given logger.
+ *
+ * @param directory to delete contents of
+ * @param filter if null then no filter is used
+ * @param logger to notify
+ * @param recurse will look for contents of sub directories.
+ * @param deleteEmptyDirectories default is false; if true will delete
+ * directories found that are empty
+ * @deprecated As of release 0.6.0, replaced by
+ * {@link #deleteFilesInDirectory(File, FilenameFilter, Logger, boolean, boolean)}
+ */
+ @Deprecated
+ public static void deleteFilesInDir(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) {
+ // ensure the specified directory is actually a directory and that it exists
+ if (null != directory && directory.isDirectory()) {
+ final File ingestFiles[] = directory.listFiles();
+ if (ingestFiles == null) {
+ // null if abstract pathname does not denote a directory, or if an I/O error occurs
+ logger.error("Unable to list directory content in: " + directory.getAbsolutePath());
+ }
+ for (File ingestFile : ingestFiles) {
+ boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName());
+ if (ingestFile.isFile() && process) {
+ FileUtils.deleteFile(ingestFile, logger, 3);
+ }
+ if (ingestFile.isDirectory() && recurse) {
+ FileUtils.deleteFilesInDir(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
+ if (deleteEmptyDirectories && ingestFile.list().length == 0) {
+ FileUtils.deleteFile(ingestFile, logger, 3);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes all files (not directories..) in the given directory (non
+ * recursive) that match the given filename filter. If any file cannot be
+ * deleted then this is printed at warn to the given logger.
+ *
+ * @param directory to delete contents of
+ * @param filter if null then no filter is used
+ * @param logger to notify
+ * @throws IOException if abstract pathname does not denote a directory,
+ * or if an I/O error occurs
+ */
+ public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger) throws IOException {
+ FileUtils.deleteFilesInDirectory(directory, filter, logger, false);
+ }
+
+ /**
+ * Deletes all files (not directories) in the given directory (recursive)
+ * that match the given filename filter. If any file cannot be deleted then
+ * this is printed at warn to the given logger.
+ *
+ * @param directory to delete contents of
+ * @param filter if null then no filter is used
+ * @param logger to notify
+ * @param recurse true if should recurse
+ * @throws IOException if abstract pathname does not denote a directory,
+ * or if an I/O error occurs
+ */
+ public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse) throws IOException {
+ FileUtils.deleteFilesInDirectory(directory, filter, logger, recurse, false);
+ }
+
+ /**
+ * Deletes all files (not directories) in the given directory (recursive)
+ * that match the given filename filter. If any file cannot be deleted then
+ * this is printed at warn to the given logger.
+ *
+ * @param directory to delete contents of
+ * @param filter if null then no filter is used
+ * @param logger to notify
+ * @param recurse will look for contents of sub directories.
+ * @param deleteEmptyDirectories default is false; if true will delete
+ * directories found that are empty
+ * @throws IOException if abstract pathname does not denote a directory,
+ * or if an I/O error occurs
+ */
+ public static void deleteFilesInDirectory(final File directory, final FilenameFilter filter, final Logger logger, final boolean recurse, final boolean deleteEmptyDirectories) throws IOException {
+ // ensure the specified directory is actually a directory and that it exists
+ if (null != directory && directory.isDirectory()) {
+ final File ingestFiles[] = directory.listFiles();
+ if (ingestFiles == null) {
+ // null if abstract pathname does not denote a directory, or if an I/O error occurs
+ throw new IOException("Unable to list directory content in: " + directory.getAbsolutePath());
+ }
+ for (File ingestFile : ingestFiles) {
+ boolean process = (filter == null) ? true : filter.accept(directory, ingestFile.getName());
+ if (ingestFile.isFile() && process) {
+ FileUtils.deleteFile(ingestFile, logger, 3);
+ }
+ if (ingestFile.isDirectory() && recurse) {
+ FileUtils.deleteFilesInDirectory(ingestFile, filter, logger, recurse, deleteEmptyDirectories);
+ if (deleteEmptyDirectories && ingestFile.list().length == 0) {
+ FileUtils.deleteFile(ingestFile, logger, 3);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Deletes given files.
+ *
+ * @param files to delete
+ * @param recurse will recurse
+ * @throws IOException if issues deleting files
+ */
+ public static void deleteFiles(final Collection<File> files, final boolean recurse) throws IOException {
+ for (final File file : files) {
+ FileUtils.deleteFile(file, recurse);
+ }
+ }
+
+ public static void deleteFile(final File file, final boolean recurse) throws IOException {
+ final File[] list = file.listFiles();
+ if (file.isDirectory() && recurse && list != null) {
+ FileUtils.deleteFiles(Arrays.asList(list), recurse);
+ }
+ //now delete the file itself regardless of whether it is plain file or a directory
+ if (!FileUtils.deleteFile(file, null, 5)) {
+ throw new IOException("Unable to delete " + file.getAbsolutePath());
+ }
+ }
+
+ public static void sleepQuietly(final long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (final InterruptedException ex) {
+ /* do nothing */
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java
new file mode 100644
index 0000000..882c8c6
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/java/org/apache/nifi/nar/NarUnpackerTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.nar;
+
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.HashSet;
+import java.util.Set;
+
+import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class NarUnpackerTest {
+
+ @BeforeClass
+ public static void copyResources() throws IOException {
+
+ final Path sourcePath = Paths.get("./src/test/resources");
+ final Path targetPath = Paths.get("./target");
+
+ Files.walkFileTree(sourcePath, new SimpleFileVisitor<Path>() {
+
+ @Override
+ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
+ throws IOException {
+
+ Path relativeSource = sourcePath.relativize(dir);
+ Path target = targetPath.resolve(relativeSource);
+
+ Files.createDirectories(target);
+
+ return FileVisitResult.CONTINUE;
+
+ }
+
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
+ throws IOException {
+
+ Path relativeSource = sourcePath.relativize(file);
+ Path target = targetPath.resolve(relativeSource);
+
+ Files.copy(file, target, REPLACE_EXISTING);
+
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ }
+
+ @Test
+ public void testUnpackNars() {
+
+ NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties");
+
+ assertEquals("./target/NarUnpacker/lib/",
+ properties.getProperty("nifi.nar.library.directory"));
+ assertEquals("./target/NarUnpacker/lib2/",
+ properties.getProperty("nifi.nar.library.directory.alt"));
+
+ final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties);
+
+ assertEquals(2, extensionMapping.getAllExtensionNames().size());
+
+ assertTrue(extensionMapping.getAllExtensionNames().contains(
+ "org.apache.nifi.processors.dummy.one"));
+ assertTrue(extensionMapping.getAllExtensionNames().contains(
+ "org.apache.nifi.processors.dummy.two"));
+ final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory();
+ File[] extensionFiles = extensionsWorkingDir.listFiles();
+
+ Set<String> expectedNars = new HashSet<>();
+ expectedNars.add("dummy-one.nar-unpacked");
+ expectedNars.add("dummy-two.nar-unpacked");
+ assertEquals(expectedNars.size(), extensionFiles.length);
+
+ for (File extensionFile : extensionFiles) {
+ Assert.assertTrue(expectedNars.contains(extensionFile.getName()));
+ }
+ }
+
+ @Test
+ public void testUnpackNarsFromEmptyDir() throws IOException {
+
+ NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties");
+
+ final File emptyDir = new File("./target/empty/dir");
+ emptyDir.delete();
+ emptyDir.deleteOnExit();
+ assertTrue(emptyDir.mkdirs());
+
+ properties.setProperty("nifi.nar.library.directory.alt", emptyDir.toString());
+
+ final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties);
+
+ assertEquals(1, extensionMapping.getAllExtensionNames().size());
+ assertTrue(extensionMapping.getAllExtensionNames().contains(
+ "org.apache.nifi.processors.dummy.one"));
+
+ final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory();
+ File[] extensionFiles = extensionsWorkingDir.listFiles();
+
+ assertEquals(1, extensionFiles.length);
+ assertEquals("dummy-one.nar-unpacked", extensionFiles[0].getName());
+ }
+
+ @Test
+ public void testUnpackNarsFromNonExistantDir() {
+
+ final File nonExistantDir = new File("./target/this/dir/should/not/exist/");
+ nonExistantDir.delete();
+ nonExistantDir.deleteOnExit();
+
+ NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties");
+ properties.setProperty("nifi.nar.library.directory.alt", nonExistantDir.toString());
+
+ final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties);
+
+ assertTrue(extensionMapping.getAllExtensionNames().contains(
+ "org.apache.nifi.processors.dummy.one"));
+
+ assertEquals(1, extensionMapping.getAllExtensionNames().size());
+
+ final File extensionsWorkingDir = properties.getExtensionsWorkingDirectory();
+ File[] extensionFiles = extensionsWorkingDir.listFiles();
+
+ assertEquals(1, extensionFiles.length);
+ assertEquals("dummy-one.nar-unpacked", extensionFiles[0].getName());
+ }
+
+ @Test
+ public void testUnpackNarsFromNonDir() throws IOException {
+
+ final File nonDir = new File("./target/file.txt");
+ nonDir.createNewFile();
+ nonDir.deleteOnExit();
+
+ NiFiProperties properties = loadSpecifiedProperties("/NarUnpacker/conf/nifi.properties");
+ properties.setProperty("nifi.nar.library.directory.alt", nonDir.toString());
+
+ final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties);
+
+ assertNull(extensionMapping);
+ }
+
+ private NiFiProperties loadSpecifiedProperties(String propertiesFile) {
+ String filePath;
+ try {
+ filePath = NarUnpackerTest.class.getResource(propertiesFile).toURI().getPath();
+ } catch (URISyntaxException ex) {
+ throw new RuntimeException("Cannot load properties file due to "
+ + ex.getLocalizedMessage(), ex);
+ }
+ System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, filePath);
+
+ NiFiProperties properties = NiFiProperties.getInstance();
+
+ // clear out existing properties
+ for (String prop : properties.stringPropertyNames()) {
+ properties.remove(prop);
+ }
+
+ InputStream inStream = null;
+ try {
+ inStream = new BufferedInputStream(new FileInputStream(filePath));
+ properties.load(inStream);
+ } catch (final Exception ex) {
+ throw new RuntimeException("Cannot load properties file due to "
+ + ex.getLocalizedMessage(), ex);
+ } finally {
+ if (null != inStream) {
+ try {
+ inStream.close();
+ } catch (final Exception ex) {
+ /**
+ * do nothing *
+ */
+ }
+ }
+ }
+
+ return properties;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties
new file mode 100644
index 0000000..acbedf9
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/conf/nifi.properties
@@ -0,0 +1,79 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Core Properties #
+nifi.version=nifi-test 3.0.0
+nifi.flow.configuration.file=./target/flow.xml.gz
+nifi.flow.configuration.archive.dir=./target/archive/
+nifi.flowcontroller.autoResumeState=true
+nifi.flowcontroller.graceful.shutdown.period=10 sec
+nifi.flowservice.writedelay.interval=2 sec
+nifi.administrative.yield.duration=30 sec
+
+nifi.reporting.task.configuration.file=./target/reporting-tasks.xml
+nifi.controller.service.configuration.file=./target/controller-services.xml
+nifi.templates.directory=./target/templates
+nifi.ui.banner.text=UI Banner Text
+nifi.ui.autorefresh.interval=30 sec
+nifi.nar.library.directory=./target/NarUnpacker/lib/
+nifi.nar.library.directory.alt=./target/NarUnpacker/lib2/
+
+nifi.nar.working.directory=./target/work/nar/
+
+# H2 Settings
+nifi.database.directory=./target/database_repository
+nifi.h2.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
+
+# FlowFile Repository
+nifi.flowfile.repository.directory=./target/test-repo
+nifi.flowfile.repository.partitions=1
+nifi.flowfile.repository.checkpoint.interval=2 mins
+nifi.queue.swap.threshold=20000
+nifi.swap.storage.directory=./target/test-repo/swap
+nifi.swap.in.period=5 sec
+nifi.swap.in.threads=1
+nifi.swap.out.period=5 sec
+nifi.swap.out.threads=4
+
+# Content Repository
+nifi.content.claim.max.appendable.size=10 MB
+nifi.content.claim.max.flow.files=100
+nifi.content.repository.directory.default=./target/content_repository
+
+# Provenance Repository Properties
+nifi.provenance.repository.storage.directory=./target/provenance_repository
+nifi.provenance.repository.max.storage.time=24 hours
+nifi.provenance.repository.max.storage.size=1 GB
+nifi.provenance.repository.rollover.time=30 secs
+nifi.provenance.repository.rollover.size=100 MB
+
+# security properties #
+nifi.sensitive.props.key=key
+nifi.sensitive.props.algorithm=PBEWITHMD5AND256BITAES-CBC-OPENSSL
+nifi.sensitive.props.provider=BC
+
+nifi.security.keystore=
+nifi.security.keystoreType=
+nifi.security.keystorePasswd=
+nifi.security.keyPasswd=
+nifi.security.truststore=
+nifi.security.truststoreType=
+nifi.security.truststorePasswd=
+nifi.security.needClientAuth=
+nifi.security.authorizedUsers.file=./target/conf/authorized-users.xml
+nifi.security.user.credential.cache.duration=24 hours
+nifi.security.user.authority.provider=nifi.authorization.FileAuthorizationProvider
+nifi.security.support.new.account.requests=
+nifi.security.default.user.roles=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar
new file mode 100644
index 0000000..598b27f
Binary files /dev/null and b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib/dummy-one.nar differ
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib/minifi-framework-nar.nar
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib/minifi-framework-nar.nar b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib/minifi-framework-nar.nar
new file mode 100644
index 0000000..994c5c9
Binary files /dev/null and b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib/minifi-framework-nar.nar differ
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar
new file mode 100644
index 0000000..a1021ba
Binary files /dev/null and b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-nar-utils/src/test/resources/NarUnpacker/lib2/dummy-two.nar differ
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh
index 51bf67e..0482472 100755
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-resources/src/main/resources/bin/minifi.sh
@@ -233,7 +233,7 @@ case "$1" in
install)
install "$@"
;;
- start|stop|run|status|dump|env)
+ start|stop|run|status|flowStatus|dump|env)
main "$@"
;;
restart)
@@ -242,6 +242,6 @@ case "$1" in
run "start"
;;
*)
- echo "Usage minifi {start|stop|run|restart|status|dump|install}"
+ echo "Usage minifi {start|stop|run|restart|status|flowStatus|dump|install}"
;;
esac
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml
index 36c4347..9b02b2a 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/pom.xml
@@ -29,6 +29,15 @@ limitations under the License.
<dependencies>
<dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-nar-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-framework-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-runtime</artifactId>
</dependency>
@@ -40,6 +49,10 @@ limitations under the License.
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <artifactId>minifi-nar-utils</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
index 8e5802f..0357e26 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/BootstrapListener.java
@@ -21,6 +21,7 @@ import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.lang.management.LockInfo;
@@ -42,6 +43,8 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.nifi.minifi.commons.status.FlowStatusReport;
+import org.apache.nifi.minifi.status.StatusRequestException;
import org.apache.nifi.util.LimitingInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,15 +53,15 @@ public class BootstrapListener {
private static final Logger logger = LoggerFactory.getLogger(org.apache.nifi.BootstrapListener.class);
- private final MiNiFi nifi;
+ private final MiNiFi minifi;
private final int bootstrapPort;
private final String secretKey;
private volatile Listener listener;
private volatile ServerSocket serverSocket;
- public BootstrapListener(final MiNiFi nifi, final int bootstrapPort) {
- this.nifi = nifi;
+ public BootstrapListener(final MiNiFi minifi, final int bootstrapPort) {
+ this.minifi = minifi;
this.bootstrapPort = bootstrapPort;
secretKey = UUID.randomUUID().toString();
}
@@ -197,17 +200,22 @@ public class BootstrapListener {
case RELOAD:
logger.info("Received RELOAD request from Bootstrap");
echoReload(socket.getOutputStream());
- nifi.shutdownHook(true);
+ minifi.shutdownHook(true);
return;
case SHUTDOWN:
logger.info("Received SHUTDOWN request from Bootstrap");
echoShutdown(socket.getOutputStream());
- nifi.shutdownHook(false);
+ minifi.shutdownHook(false);
return;
case DUMP:
logger.info("Received DUMP request from Bootstrap");
writeDump(socket.getOutputStream());
break;
+ case FLOW_STATUS_REPORT:
+ logger.info("Received FLOW_STATUS_REPORT request from Bootstrap");
+ String flowStatusRequestString = request.getArgs()[0];
+ writeStatusReport(flowStatusRequestString, socket.getOutputStream());
+ break;
}
} catch (final Throwable t) {
logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
@@ -227,6 +235,13 @@ public class BootstrapListener {
}
}
+ private void writeStatusReport(String flowStatusRequestString, final OutputStream out) throws IOException, StatusRequestException {
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ FlowStatusReport flowStatusReport = minifi.getMinifiServer().getStatusReport(flowStatusRequestString);
+ oos.writeObject(flowStatusReport);
+ oos.close();
+ }
+
private static void writeDump(final OutputStream out) throws IOException {
final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
@@ -390,7 +405,8 @@ public class BootstrapListener {
RELOAD,
SHUTDOWN,
DUMP,
- PING;
+ PING,
+ FLOW_STATUS_REPORT;
}
private final RequestType requestType;
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
index 669acdc..b2b025b 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-runtime/src/main/java/org/apache/nifi/minifi/MiNiFi.java
@@ -31,13 +31,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.nifi.NiFiServer;
-import org.apache.nifi.documentation.DocGenerator;
+// These are from the minifi-nar-utils
import org.apache.nifi.nar.ExtensionManager;
-import org.apache.nifi.nar.ExtensionMapping;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.util.FileUtils;
+
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +45,7 @@ import org.slf4j.bridge.SLF4JBridgeHandler;
public class MiNiFi {
private static final Logger logger = LoggerFactory.getLogger(MiNiFi.class);
- private final NiFiServer nifiServer;
+ private final MiNiFiServer minifiServer;
private final BootstrapListener bootstrapListener;
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
@@ -106,7 +105,7 @@ public class MiNiFi {
SLF4JBridgeHandler.install();
// expand the nars
- final ExtensionMapping extensionMapping = NarUnpacker.unpackNars(properties);
+ NarUnpacker.unpackNars(properties);
// load the extensions classloaders
NarClassLoaders.load(properties);
@@ -121,21 +120,18 @@ public class MiNiFi {
ExtensionManager.discoverExtensions();
ExtensionManager.logClassLoaderMapping();
- DocGenerator.generate(properties);
-
// load the server from the framework classloader
Thread.currentThread().setContextClassLoader(frameworkClassLoader);
- Class<?> jettyServer = Class.forName("org.apache.nifi.web.server.JettyServer", true, frameworkClassLoader);
- Constructor<?> jettyConstructor = jettyServer.getConstructor(NiFiProperties.class);
+ Class<?> minifiServerClass= Class.forName("org.apache.nifi.minifi.MiNiFiServer", true, frameworkClassLoader);
+ Constructor<?> minifiServerConstructor = minifiServerClass.getConstructor(NiFiProperties.class);
final long startTime = System.nanoTime();
- nifiServer = (NiFiServer) jettyConstructor.newInstance(properties);
- nifiServer.setExtensionMapping(extensionMapping);
+ minifiServer = (MiNiFiServer) minifiServerConstructor.newInstance(properties);
if (shutdown) {
logger.info("MiNiFi has been shutdown via MiNiFi Bootstrap. Will not start Controller");
} else {
- nifiServer.start();
+ minifiServer.start();
if (bootstrapListener != null) {
bootstrapListener.sendStartedStatus(true);
@@ -150,9 +146,9 @@ public class MiNiFi {
try {
this.shutdown = true;
- logger.info("Initiating shutdown of Jetty web server...");
- if (nifiServer != null) {
- nifiServer.stop();
+ logger.info("Initiating shutdown of MiNiFi server...");
+ if (minifiServer != null) {
+ minifiServer.stop();
}
if (bootstrapListener != null) {
if (isReload) {
@@ -161,9 +157,9 @@ public class MiNiFi {
bootstrapListener.stop();
}
}
- logger.info("Jetty web server shutdown completed (nicely or otherwise).");
+ logger.info("MiNiFi server shutdown completed (nicely or otherwise).");
} catch (final Throwable t) {
- logger.warn("Problem occurred ensuring Jetty web server was properly terminated due to " + t);
+ logger.warn("Problem occurred ensuring MiNiFi server was properly terminated due to " + t);
}
}
@@ -188,14 +184,14 @@ public class MiNiFi {
});
final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
- final AtomicInteger occurences = new AtomicInteger(0);
+ final AtomicInteger occurrences = new AtomicInteger(0);
final Runnable command = new Runnable() {
@Override
public void run() {
final long curMillis = System.currentTimeMillis();
final long difference = curMillis - lastTriggerMillis.get();
final long millisOff = Math.abs(difference - 2000L);
- occurences.incrementAndGet();
+ occurrences.incrementAndGet();
if (millisOff > 500L) {
occurrencesOutOfRange.incrementAndGet();
}
@@ -211,7 +207,7 @@ public class MiNiFi {
future.cancel(true);
service.shutdownNow();
- if (occurences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
+ if (occurrences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
logger.warn("MiNiFi has detected that this box is not responding within the expected timing interval, which may cause "
+ "Processors to be scheduled erratically. Please see the MiNiFi documentation for more information.");
}
@@ -221,6 +217,10 @@ public class MiNiFi {
timer.schedule(timerTask, 60000L);
}
+ MiNiFiServer getMinifiServer() {
+ return minifiServer;
+ }
+
/**
* Main entry point of the application.
*
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/pom.xml b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/pom.xml
index 5de51c6..3566ee9 100644
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/pom.xml
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/pom.xml
@@ -27,8 +27,10 @@ limitations under the License.
<artifactId>minifi-framework</artifactId>
<packaging>pom</packaging>
<modules>
+ <module>minifi-framework-core</module>
<module>minifi-runtime</module>
<module>minifi-resources</module>
+ <module>minifi-nar-utils</module>
</modules>
<dependencies>
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
index 9916ccd..243e14d 100644
--- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
+++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/pom.xml
@@ -51,7 +51,7 @@ limitations under the License.
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-persistent-provenance-repository</artifactId>
- <version>0.6.0</version>
+ <scope>compile</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/99820519/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml
index 1140773..f73490b 100644
--- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml
+++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repository-nar/pom.xml
@@ -36,7 +36,15 @@ limitations under the License.
<artifactId>minifi-persistent-provenance-repository</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-data-provenance-utils</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-persistent-provenance-repository</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
-
-
</project>
\ No newline at end of file