You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/15 11:14:55 UTC
[10/12] incubator-nifi git commit: NIFI-169 well it finally all
builds. There is a classpath issue still to sort out which impacts startup
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
----------------------------------------------------------------------
diff --git a/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml b/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
deleted file mode 100644
index 0680d18..0000000
--- a/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<component-set>
- <components>
- <component>
- <role>org.apache.maven.lifecycle.mapping.LifecycleMapping</role>
- <role-hint>nar</role-hint>
- <implementation>org.apache.maven.lifecycle.mapping.DefaultLifecycleMapping</implementation>
- <configuration>
- <lifecycles>
- <lifecycle>
- <id>default</id>
- <phases>
- <process-resources>org.apache.maven.plugins:maven-resources-plugin:resources</process-resources>
- <compile>org.apache.maven.plugins:maven-compiler-plugin:compile</compile>
- <process-test-resources>org.apache.maven.plugins:maven-resources-plugin:testResources</process-test-resources>
- <test-compile>org.apache.maven.plugins:maven-compiler-plugin:testCompile</test-compile>
- <test>org.apache.maven.plugins:maven-surefire-plugin:test</test>
- <package>org.apache.nifi:nar-maven-plugin:nar</package>
- <install>org.apache.maven.plugins:maven-install-plugin:install</install>
- <deploy>org.apache.maven.plugins:maven-deploy-plugin:deploy</deploy>
- </phases>
- </lifecycle>
- </lifecycles>
- </configuration>
- </component>
- <component>
- <role>org.apache.maven.artifact.handler.ArtifactHandler</role>
- <role-hint>nar</role-hint>
- <implementation>org.apache.maven.artifact.handler.DefaultArtifactHandler</implementation>
- <configuration>
- <type>nar</type>
- <language>java</language>
- <addedToClasspath>false</addedToClasspath>
- <includesDependencies>true</includesDependencies>
- </configuration>
- </component>
- </components>
-</component-set>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/misc/pom.xml
----------------------------------------------------------------------
diff --git a/misc/pom.xml b/misc/pom.xml
new file mode 100644
index 0000000..5c7ca7f
--- /dev/null
+++ b/misc/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nar-maven-plugin</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>maven-plugin</packaging>
+ <name>Apache NiFi NAR Plugin</name>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <build>
+ <defaultGoal>install</defaultGoal>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-plugin-plugin</artifactId>
+ <version>3.3</version>
+ <executions>
+ <execution>
+ <id>default-descriptor</id>
+ <goals>
+ <goal>descriptor</goal>
+ </goals>
+ <phase>process-classes</phase>
+ </execution>
+ <execution>
+ <id>help-descriptor</id>
+ <goals>
+ <goal>helpmojo</goal>
+ </goals>
+ <phase>process-classes</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-plugin-api</artifactId>
+ <version>2.0.11</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.9</version>
+ <type>maven-plugin</type>
+ </dependency>
+ <dependency>
+ <!-- No code from maven-jar-plugin is actually used; it's included
+ just to simplify the dependencies list. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.plugin-tools</groupId>
+ <artifactId>maven-plugin-annotations</artifactId>
+ <version>3.3</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <distributionManagement>
+ <repository>
+ <id>nifi-releases</id>
+ <url>${nifi.repo.url}</url>
+ </repository>
+ <snapshotRepository>
+ <id>nifi-snapshots</id>
+ <url>${nifi.snapshot.repo.url}</url>
+ </snapshotRepository>
+ </distributionManagement>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/misc/src/main/java/nifi/NarMojo.java
----------------------------------------------------------------------
diff --git a/misc/src/main/java/nifi/NarMojo.java b/misc/src/main/java/nifi/NarMojo.java
new file mode 100644
index 0000000..5196f73
--- /dev/null
+++ b/misc/src/main/java/nifi/NarMojo.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nifi;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.maven.archiver.MavenArchiveConfiguration;
+import org.apache.maven.archiver.MavenArchiver;
+import org.apache.maven.artifact.Artifact;
+import org.apache.maven.artifact.DependencyResolutionRequiredException;
+import org.apache.maven.artifact.factory.ArtifactFactory;
+import org.apache.maven.artifact.installer.ArtifactInstaller;
+import org.apache.maven.artifact.metadata.ArtifactMetadataSource;
+import org.apache.maven.artifact.repository.ArtifactRepository;
+import org.apache.maven.artifact.repository.ArtifactRepositoryFactory;
+import org.apache.maven.artifact.resolver.ArtifactCollector;
+import org.apache.maven.artifact.resolver.ArtifactNotFoundException;
+import org.apache.maven.artifact.resolver.ArtifactResolutionException;
+import org.apache.maven.artifact.resolver.ArtifactResolver;
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugin.MojoFailureException;
+import org.apache.maven.plugin.dependency.utils.DependencyStatusSets;
+import org.apache.maven.plugin.dependency.utils.DependencyUtil;
+import org.apache.maven.plugin.dependency.utils.filters.DestFileFilter;
+import org.apache.maven.plugin.dependency.utils.resolvers.ArtifactsResolver;
+import org.apache.maven.plugin.dependency.utils.resolvers.DefaultArtifactsResolver;
+import org.apache.maven.plugin.dependency.utils.translators.ArtifactTranslator;
+import org.apache.maven.plugin.dependency.utils.translators.ClassifierTypeTranslator;
+import org.apache.maven.plugins.annotations.LifecyclePhase;
+import org.apache.maven.plugins.annotations.Mojo;
+import org.apache.maven.plugins.annotations.Parameter;
+import org.apache.maven.plugins.annotations.ResolutionScope;
+import org.apache.maven.project.MavenProject;
+import org.apache.maven.execution.MavenSession;
+import org.apache.maven.plugins.annotations.Component;
+import org.apache.maven.project.MavenProjectHelper;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactFilterException;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactIdFilter;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactsFilter;
+import org.apache.maven.shared.artifact.filter.collection.ClassifierFilter;
+import org.apache.maven.shared.artifact.filter.collection.FilterArtifacts;
+import org.apache.maven.shared.artifact.filter.collection.GroupIdFilter;
+import org.apache.maven.shared.artifact.filter.collection.ScopeFilter;
+import org.apache.maven.shared.artifact.filter.collection.ProjectTransitivityFilter;
+import org.apache.maven.shared.artifact.filter.collection.TypeFilter;
+import org.codehaus.plexus.archiver.ArchiverException;
+import org.codehaus.plexus.archiver.jar.JarArchiver;
+import org.codehaus.plexus.archiver.jar.ManifestException;
+import org.codehaus.plexus.archiver.manager.ArchiverManager;
+import org.codehaus.plexus.util.FileUtils;
+import org.codehaus.plexus.util.StringUtils;
+
+/**
+ * Packages the current project as an Apache NiFi Archive (NAR).
+ *
+ * The following code is derived from maven-dependencies-plugin and
+ * maven-jar-plugin. The functionality of CopyDependenciesMojo and JarMojo was
+ * simplified to the use case of NarMojo.
+ *
+ */
+@Mojo(name = "nar", defaultPhase = LifecyclePhase.PACKAGE, threadSafe = false, requiresDependencyResolution = ResolutionScope.RUNTIME)
+public class NarMojo extends AbstractMojo {
+
+ private static final String[] DEFAULT_EXCLUDES = new String[]{"**/package.html"};
+ private static final String[] DEFAULT_INCLUDES = new String[]{"**/**"};
+
+ /**
+ * POM
+ *
+ */
+ @Parameter(defaultValue = "${project}", readonly = true, required = true)
+ protected MavenProject project;
+
+ @Parameter(defaultValue = "${session}", readonly = true, required = true)
+ protected MavenSession session;
+
+ /**
+ * List of files to include. Specified as fileset patterns.
+ */
+ @Parameter(property = "includes")
+ protected String[] includes;
+ /**
+ * List of files to exclude. Specified as fileset patterns.
+ */
+ @Parameter(property = "excludes")
+ protected String[] excludes;
+ /**
+ * Name of the generated NAR.
+ *
+ */
+ @Parameter(alias = "narName", property = "nar.finalName", defaultValue = "${project.build.finalName}", required = true)
+ protected String finalName;
+
+ /**
+ * The Jar archiver.
+ *
+ * \@\component role="org.codehaus.plexus.archiver.Archiver" roleHint="jar"
+ */
+ @Component(role = org.codehaus.plexus.archiver.Archiver.class, hint = "jar")
+ private JarArchiver jarArchiver;
+ /**
+ * The archive configuration to use.
+ *
+ * See <a
+ * href="http://maven.apache.org/shared/maven-archiver/index.html">the
+ * documentation for Maven Archiver</a>.
+ *
+ */
+ @Parameter(property = "archive")
+ protected final MavenArchiveConfiguration archive = new MavenArchiveConfiguration();
+ /**
+ * Path to the default MANIFEST file to use. It will be used if
+ * <code>useDefaultManifestFile</code> is set to <code>true</code>.
+ *
+ */
+ @Parameter(property = "defaultManifestFiles", defaultValue = "${project.build.outputDirectory}/META-INF/MANIFEST.MF", readonly = true, required = true)
+ protected File defaultManifestFile;
+
+ /**
+ * Set this to <code>true</code> to enable the use of the
+ * <code>defaultManifestFile</code>.
+ *
+ * @since 2.2
+ */
+ @Parameter(property = "nar.useDefaultManifestFile", defaultValue = "false")
+ protected boolean useDefaultManifestFile;
+
+ @Component
+ protected MavenProjectHelper projectHelper;
+
+ /**
+ * Whether creating the archive should be forced.
+ *
+ */
+ @Parameter(property = "nar.forceCreation", defaultValue = "false")
+ protected boolean forceCreation;
+
+ /**
+ * Classifier to add to the artifact generated. If given, the artifact will
+ * be an attachment instead.
+ *
+ */
+ @Parameter(property = "classifier")
+ protected String classifier;
+
+ @Component
+ protected ArtifactInstaller installer;
+
+ @Component
+ protected ArtifactRepositoryFactory repositoryFactory;
+
+ /**
+ * This only applies if the classifier parameter is used.
+ *
+ */
+ @Parameter(property = "mdep.failOnMissingClassifierArtifact", defaultValue = "true", required = false)
+ protected boolean failOnMissingClassifierArtifact = true;
+
+ /**
+ * Comma Separated list of Types to include. Empty String indicates include
+ * everything (default).
+ *
+ */
+ @Parameter(property = "includeTypes", required = false)
+ protected String includeTypes;
+
+ /**
+ * Comma Separated list of Types to exclude. Empty String indicates don't
+ * exclude anything (default).
+ *
+ */
+ @Parameter(property = "excludeTypes", required = false)
+ protected String excludeTypes;
+
+ /**
+ * Scope to include. An Empty string indicates all scopes (default).
+ *
+ */
+ @Parameter(property = "includeScope", required = false)
+ protected String includeScope;
+
+ /**
+ * Scope to exclude. An Empty string indicates no scopes (default).
+ *
+ */
+ @Parameter(property = "excludeScope", required = false)
+ protected String excludeScope;
+
+ /**
+ * Comma Separated list of Classifiers to include. Empty String indicates
+ * include everything (default).
+ *
+ */
+ @Parameter(property = "includeClassifiers", required = false)
+ protected String includeClassifiers;
+
+ /**
+ * Comma Separated list of Classifiers to exclude. Empty String indicates
+ * don't exclude anything (default).
+ *
+ */
+ @Parameter(property = "excludeClassifiers", required = false)
+ protected String excludeClassifiers;
+
+ /**
+ * Specify classifier to look for. Example: sources
+ *
+ */
+ @Parameter(property = "classifier", required = false)
+ protected String copyDepClassifier;
+
+ /**
+ * Specify type to look for when constructing artifact based on classifier.
+ * Example: java-source,jar,war, nar
+ *
+ */
+ @Parameter(property = "type", required = false, defaultValue = "nar")
+ protected String type;
+
+ /**
+ * Comma separated list of Artifact names too exclude.
+ *
+ */
+ @Parameter(property = "excludeArtifacts", required = false)
+ protected String excludeArtifactIds;
+
+ /**
+ * Comma separated list of Artifact names to include.
+ *
+ */
+ @Parameter(property = "includeArtifacts", required = false)
+ protected String includeArtifactIds;
+
+ /**
+ * Comma separated list of GroupId Names to exclude.
+ *
+ */
+ @Parameter(property = "excludeArtifacts", required = false)
+ protected String excludeGroupIds;
+
+ /**
+ * Comma separated list of GroupIds to include.
+ *
+ */
+ @Parameter(property = "includeGroupIds", required = false)
+ protected String includeGroupIds;
+
+ /**
+ * Directory to store flag files
+ *
+ */
+ @Parameter(property = "markersDirectory", required = false, defaultValue = "${project.build.directory}/dependency-maven-plugin-markers")
+ protected File markersDirectory;
+
+ /**
+ * Overwrite release artifacts
+ *
+ */
+ @Parameter(property = "overWriteReleases", required = false)
+ protected boolean overWriteReleases;
+
+ /**
+ * Overwrite snapshot artifacts
+ *
+ */
+ @Parameter(property = "overWriteSnapshots", required = false)
+ protected boolean overWriteSnapshots;
+
+ /**
+ * Overwrite artifacts that don't exist or are older than the source.
+ *
+ */
+ @Parameter(property = "overWriteIfNewer", required = false, defaultValue = "true")
+ protected boolean overWriteIfNewer;
+
+ /**
+ * Used to look up Artifacts in the remote repository.
+ */
+ @Component
+ protected ArtifactFactory factory;
+
+ /**
+ * Used to look up Artifacts in the remote repository.
+ *
+ */
+ @Component
+ protected ArtifactResolver resolver;
+
+ /**
+ * Artifact collector, needed to resolve dependencies.
+ *
+ */
+ @Component(role = org.apache.maven.artifact.resolver.ArtifactCollector.class)
+ protected ArtifactCollector artifactCollector;
+
+ @Component(role = org.apache.maven.artifact.metadata.ArtifactMetadataSource.class)
+ protected ArtifactMetadataSource artifactMetadataSource;
+
+ /**
+ * Location of the local repository.
+ *
+ */
+ @Parameter(property = "localRepository", required = true, readonly = true)
+ protected ArtifactRepository local;
+
+ /**
+ * List of Remote Repositories used by the resolver
+ *
+ */
+ @Parameter(property = "project.remoteArtifactRepositories", required = true, readonly = true)
+ protected List remoteRepos;
+
+ /**
+ * To look up Archiver/UnArchiver implementations
+ *
+ */
+ @Component
+ protected ArchiverManager archiverManager;
+
+ /**
+ * Contains the full list of projects in the reactor.
+ *
+ */
+ @Parameter(property = "reactorProjects", required = true, readonly = true)
+ protected List reactorProjects;
+
+ /**
+ * If the plugin should be silent.
+ *
+ */
+ @Parameter(property = "silent", required = false, defaultValue = "false")
+ public boolean silent;
+
+ /**
+ * Output absolute filename for resolved artifacts
+ *
+ */
+ @Parameter(property = "outputAbsoluteArtifactFilename", defaultValue = "false", required = false)
+ protected boolean outputAbsoluteArtifactFilename;
+
+ @Override
+ public void execute() throws MojoExecutionException, MojoFailureException {
+ copyDependencies();
+ makeNar();
+ }
+
+ private void copyDependencies() throws MojoExecutionException {
+ DependencyStatusSets dss = getDependencySets(this.failOnMissingClassifierArtifact);
+ Set artifacts = dss.getResolvedDependencies();
+
+ for (Object artifactObj : artifacts) {
+ copyArtifact((Artifact) artifactObj);
+ }
+
+ artifacts = dss.getSkippedDependencies();
+ for (Object artifactOjb : artifacts) {
+ Artifact artifact = (Artifact) artifactOjb;
+ getLog().info(artifact.getFile().getName() + " already exists in destination.");
+ }
+ }
+
+ protected void copyArtifact(Artifact artifact) throws MojoExecutionException {
+ String destFileName = DependencyUtil.getFormattedFileName(artifact, false);
+ final File destDir = DependencyUtil.getFormattedOutputDirectory(false, false, false, false, false, getDependenciesDirectory(), artifact);
+ final File destFile = new File(destDir, destFileName);
+ copyFile(artifact.getFile(), destFile);
+ }
+
+ protected Artifact getResolvedPomArtifact(Artifact artifact) {
+ Artifact pomArtifact = this.factory.createArtifact(artifact.getGroupId(), artifact.getArtifactId(), artifact.getVersion(), "", "pom");
+ // Resolve the pom artifact using repos
+ try {
+ this.resolver.resolve(pomArtifact, this.remoteRepos, this.local);
+ } catch (ArtifactResolutionException | ArtifactNotFoundException e) {
+ getLog().info(e.getMessage());
+ }
+ return pomArtifact;
+ }
+
+ protected ArtifactsFilter getMarkedArtifactFilter() {
+ return new DestFileFilter(this.overWriteReleases, this.overWriteSnapshots, this.overWriteIfNewer, false, false, false, false, false, getDependenciesDirectory());
+ }
+
+ protected DependencyStatusSets getDependencySets(boolean stopOnFailure) throws MojoExecutionException {
+ // add filters in well known order, least specific to most specific
+ FilterArtifacts filter = new FilterArtifacts();
+
+ filter.addFilter(new ProjectTransitivityFilter(project.getDependencyArtifacts(), false));
+ filter.addFilter(new ScopeFilter(this.includeScope, this.excludeScope));
+ filter.addFilter(new TypeFilter(this.includeTypes, this.excludeTypes));
+ filter.addFilter(new ClassifierFilter(this.includeClassifiers, this.excludeClassifiers));
+ filter.addFilter(new GroupIdFilter(this.includeGroupIds, this.excludeGroupIds));
+ filter.addFilter(new ArtifactIdFilter(this.includeArtifactIds, this.excludeArtifactIds));
+
+ // explicitly filter our nar dependencies
+ filter.addFilter(new TypeFilter("", "nar"));
+
+ // start with all artifacts.
+ Set artifacts = project.getArtifacts();
+
+ // perform filtering
+ try {
+ artifacts = filter.filter(artifacts);
+ } catch (ArtifactFilterException e) {
+ throw new MojoExecutionException(e.getMessage(), e);
+ }
+
+ // transform artifacts if classifier is set
+ final DependencyStatusSets status;
+ if (StringUtils.isNotEmpty(copyDepClassifier)) {
+ status = getClassifierTranslatedDependencies(artifacts, stopOnFailure);
+ } else {
+ status = filterMarkedDependencies(artifacts);
+ }
+
+ return status;
+ }
+
+ protected DependencyStatusSets getClassifierTranslatedDependencies(Set artifacts, boolean stopOnFailure) throws MojoExecutionException {
+ Set unResolvedArtifacts = new HashSet();
+ Set resolvedArtifacts = artifacts;
+ DependencyStatusSets status = new DependencyStatusSets();
+
+ // possibly translate artifacts into a new set of artifacts based on the
+ // classifier and type
+ // if this did something, we need to resolve the new artifacts
+ if (StringUtils.isNotEmpty(copyDepClassifier)) {
+ ArtifactTranslator translator = new ClassifierTypeTranslator(this.copyDepClassifier, this.type, this.factory);
+ artifacts = translator.translate(artifacts, getLog());
+
+ status = filterMarkedDependencies(artifacts);
+
+ // the unskipped artifacts are in the resolved set.
+ artifacts = status.getResolvedDependencies();
+
+ // resolve the rest of the artifacts
+ ArtifactsResolver artifactsResolver = new DefaultArtifactsResolver(this.resolver, this.local,
+ this.remoteRepos, stopOnFailure);
+ resolvedArtifacts = artifactsResolver.resolve(artifacts, getLog());
+
+ // calculate the artifacts not resolved.
+ unResolvedArtifacts.addAll(artifacts);
+ unResolvedArtifacts.removeAll(resolvedArtifacts);
+ }
+
+ // return a bean of all 3 sets.
+ status.setResolvedDependencies(resolvedArtifacts);
+ status.setUnResolvedDependencies(unResolvedArtifacts);
+
+ return status;
+ }
+
+ protected DependencyStatusSets filterMarkedDependencies(Set artifacts) throws MojoExecutionException {
+ // remove files that have markers already
+ FilterArtifacts filter = new FilterArtifacts();
+ filter.clearFilters();
+ filter.addFilter(getMarkedArtifactFilter());
+
+ Set unMarkedArtifacts;
+ try {
+ unMarkedArtifacts = filter.filter(artifacts);
+ } catch (ArtifactFilterException e) {
+ throw new MojoExecutionException(e.getMessage(), e);
+ }
+
+ // calculate the skipped artifacts
+ Set skippedArtifacts = new HashSet();
+ skippedArtifacts.addAll(artifacts);
+ skippedArtifacts.removeAll(unMarkedArtifacts);
+
+ return new DependencyStatusSets(unMarkedArtifacts, null, skippedArtifacts);
+ }
+
+ protected void copyFile(File artifact, File destFile) throws MojoExecutionException {
+ try {
+ getLog().info("Copying " + (this.outputAbsoluteArtifactFilename ? artifact.getAbsolutePath() : artifact.getName()) + " to " + destFile);
+ FileUtils.copyFile(artifact, destFile);
+ } catch (Exception e) {
+ throw new MojoExecutionException("Error copying artifact from " + artifact + " to " + destFile, e);
+ }
+ }
+
+ private File getClassesDirectory() {
+ final File outputDirectory = new File(project.getBasedir(), "target");
+ return new File(outputDirectory, "classes");
+ }
+
+ private File getDependenciesDirectory() {
+ return new File(getClassesDirectory(), "META-INF/dependencies");
+ }
+
+ private void makeNar() throws MojoExecutionException {
+ File narFile = createArchive();
+
+ if (classifier != null) {
+ projectHelper.attachArtifact(project, "nar", classifier, narFile);
+ } else {
+ project.getArtifact().setFile(narFile);
+ }
+ }
+
+ public File createArchive() throws MojoExecutionException {
+ final File outputDirectory = new File(project.getBasedir(), "target");
+ File narFile = getNarFile(outputDirectory, finalName, classifier);
+ MavenArchiver archiver = new MavenArchiver();
+ archiver.setArchiver(jarArchiver);
+ archiver.setOutputFile(narFile);
+ archive.setForced(forceCreation);
+
+ try {
+ File contentDirectory = getClassesDirectory();
+ if (!contentDirectory.exists()) {
+ getLog().warn("NAR will be empty - no content was marked for inclusion!");
+ } else {
+ archiver.getArchiver().addDirectory(contentDirectory, getIncludes(), getExcludes());
+ }
+
+ File existingManifest = defaultManifestFile;
+ if (useDefaultManifestFile && existingManifest.exists() && archive.getManifestFile() == null) {
+ getLog().info("Adding existing MANIFEST to archive. Found under: " + existingManifest.getPath());
+ archive.setManifestFile(existingManifest);
+ }
+
+ // automatically add the artifact id to the manifest
+ archive.addManifestEntry("Nar-Id", project.getArtifactId());
+
+ // look for a nar dependency
+ String narDependency = getNarDependency();
+ if (narDependency != null) {
+ archive.addManifestEntry("Nar-Dependency-Id", narDependency);
+ }
+
+ archiver.createArchive(session, project, archive);
+ return narFile;
+ } catch (ArchiverException | MojoExecutionException | ManifestException | IOException | DependencyResolutionRequiredException e) {
+ throw new MojoExecutionException("Error assembling NAR", e);
+ }
+ }
+
+ private String[] getIncludes() {
+ if (includes != null && includes.length > 0) {
+ return includes;
+ }
+ return DEFAULT_INCLUDES;
+ }
+
+ private String[] getExcludes() {
+ if (excludes != null && excludes.length > 0) {
+ return excludes;
+ }
+ return DEFAULT_EXCLUDES;
+ }
+
+ protected File getNarFile(File basedir, String finalName, String classifier) {
+ if (classifier == null) {
+ classifier = "";
+ } else if (classifier.trim().length() > 0 && !classifier.startsWith("-")) {
+ classifier = "-" + classifier;
+ }
+
+ return new File(basedir, finalName + classifier + ".nar");
+ }
+
+ private String getNarDependency() throws MojoExecutionException {
+ String narDependency = null;
+
+ // get nar dependencies
+ FilterArtifacts filter = new FilterArtifacts();
+ filter.addFilter(new TypeFilter("nar", ""));
+
+ // start with all artifacts.
+ Set artifacts = project.getArtifacts();
+
+ // perform filtering
+ try {
+ artifacts = filter.filter(artifacts);
+ } catch (ArtifactFilterException e) {
+ throw new MojoExecutionException(e.getMessage(), e);
+ }
+
+ // ensure there is a single nar dependency
+ if (artifacts.size() > 1) {
+ throw new MojoExecutionException("Each NAR represents a ClassLoader. A NAR dependency allows that NAR's ClassLoader to be "
+ + "used as the parent of this NAR's ClassLoader. As a result, only a single NAR dependency is allowed.");
+ } else if (artifacts.size() == 1) {
+ final Artifact artifact = (Artifact) artifacts.iterator().next();
+ narDependency = artifact.getArtifactId();
+ }
+
+ return narDependency;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/misc/src/main/resources/META-INF/plexus/components.xml
----------------------------------------------------------------------
diff --git a/misc/src/main/resources/META-INF/plexus/components.xml b/misc/src/main/resources/META-INF/plexus/components.xml
new file mode 100644
index 0000000..0680d18
--- /dev/null
+++ b/misc/src/main/resources/META-INF/plexus/components.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<component-set>
+ <components>
+ <component>
+ <role>org.apache.maven.lifecycle.mapping.LifecycleMapping</role>
+ <role-hint>nar</role-hint>
+ <implementation>org.apache.maven.lifecycle.mapping.DefaultLifecycleMapping</implementation>
+ <configuration>
+ <lifecycles>
+ <lifecycle>
+ <id>default</id>
+ <phases>
+ <process-resources>org.apache.maven.plugins:maven-resources-plugin:resources</process-resources>
+ <compile>org.apache.maven.plugins:maven-compiler-plugin:compile</compile>
+ <process-test-resources>org.apache.maven.plugins:maven-resources-plugin:testResources</process-test-resources>
+ <test-compile>org.apache.maven.plugins:maven-compiler-plugin:testCompile</test-compile>
+ <test>org.apache.maven.plugins:maven-surefire-plugin:test</test>
+ <package>org.apache.nifi:nar-maven-plugin:nar</package>
+ <install>org.apache.maven.plugins:maven-install-plugin:install</install>
+ <deploy>org.apache.maven.plugins:maven-deploy-plugin:deploy</deploy>
+ </phases>
+ </lifecycle>
+ </lifecycles>
+ </configuration>
+ </component>
+ <component>
+ <role>org.apache.maven.artifact.handler.ArtifactHandler</role>
+ <role-hint>nar</role-hint>
+ <implementation>org.apache.maven.artifact.handler.DefaultArtifactHandler</implementation>
+ <configuration>
+ <type>nar</type>
+ <language>java</language>
+ <addedToClasspath>false</addedToClasspath>
+ <includesDependencies>true</includesDependencies>
+ </configuration>
+ </component>
+ </components>
+</component-set>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
deleted file mode 100644
index 6280349..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
+++ /dev/null
@@ -1,67 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>distributed-cache-services-bundle</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>distributed-cache-client-service</artifactId>
- <packaging>jar</packaging>
-
- <name>Distributed Cache Client Service</name>
- <description>Provides a Client for interfacing with a Distributed Cache</description>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>distributed-cache-client-service-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>distributed-cache-protocol</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>remote-communications-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-processor-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-stream-utils</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>ssl-context-service-api</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.9</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
deleted file mode 100644
index f838c2f..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.client;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-
-public interface CommsSession extends Closeable {
-
- void setTimeout(final long value, final TimeUnit timeUnit);
-
- InputStream getInputStream() throws IOException;
-
- OutputStream getOutputStream() throws IOException;
-
- boolean isClosed();
-
- void interrupt();
-
- String getHostname();
-
- int getPort();
-
- long getTimeout(TimeUnit timeUnit);
-
- SSLContext getSSLContext();
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
deleted file mode 100644
index ee96660..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.client;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.annotation.OnConfigured;
-import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
-import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.io.ByteArrayOutputStream;
-import org.apache.nifi.io.DataOutputStream;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
-
- private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
-
- public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
- .name("Server Hostname")
- .description("The name of the server that is running the DistributedMapCacheServer service")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
- .name("Server Port")
- .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
- .required(true)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .defaultValue("4557")
- .build();
- public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description(
- "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
- .required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
- .defaultValue(null)
- .build();
- public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Communications Timeout")
- .description(
- "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("30 secs")
- .build();
-
- private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
- private volatile ConfigurationContext configContext;
- private volatile boolean closed = false;
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(HOSTNAME);
- descriptors.add(PORT);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(COMMUNICATIONS_TIMEOUT);
- return descriptors;
- }
-
- @OnConfigured
- public void cacheConfig(final ConfigurationContext context) {
- this.configContext = context;
- }
-
- @Override
- public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
- throws IOException {
- return withCommsSession(new CommsAction<Boolean>() {
- @Override
- public Boolean execute(final CommsSession session) throws IOException {
- final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
- dos.writeUTF("putIfAbsent");
-
- serialize(key, keySerializer, dos);
- serialize(value, valueSerializer, dos);
-
- dos.flush();
-
- final DataInputStream dis = new DataInputStream(session.getInputStream());
- return dis.readBoolean();
- }
- });
- }
-
- @Override
- public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
- return withCommsSession(new CommsAction<Boolean>() {
- @Override
- public Boolean execute(final CommsSession session) throws IOException {
- final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
- dos.writeUTF("containsKey");
-
- serialize(key, keySerializer, dos);
- dos.flush();
-
- final DataInputStream dis = new DataInputStream(session.getInputStream());
- return dis.readBoolean();
- }
- });
- }
-
- @Override
- public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
- final Deserializer<V> valueDeserializer) throws IOException {
- return withCommsSession(new CommsAction<V>() {
- @Override
- public V execute(final CommsSession session) throws IOException {
- final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
- dos.writeUTF("getAndPutIfAbsent");
-
- serialize(key, keySerializer, dos);
- serialize(value, valueSerializer, dos);
- dos.flush();
-
- // read response
- final DataInputStream dis = new DataInputStream(session.getInputStream());
- final byte[] responseBuffer = readLengthDelimitedResponse(dis);
- return valueDeserializer.deserialize(responseBuffer);
- }
- });
- }
-
- @Override
- public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
- return withCommsSession(new CommsAction<V>() {
- @Override
- public V execute(final CommsSession session) throws IOException {
- final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
- dos.writeUTF("get");
-
- serialize(key, keySerializer, dos);
- dos.flush();
-
- // read response
- final DataInputStream dis = new DataInputStream(session.getInputStream());
- final byte[] responseBuffer = readLengthDelimitedResponse(dis);
- return valueDeserializer.deserialize(responseBuffer);
- }
- });
- }
-
- @Override
- public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
- return withCommsSession(new CommsAction<Boolean>() {
- @Override
- public Boolean execute(final CommsSession session) throws IOException {
- final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
- dos.writeUTF("remove");
-
- serialize(key, serializer, dos);
- dos.flush();
-
- // read response
- final DataInputStream dis = new DataInputStream(session.getInputStream());
- return dis.readBoolean();
- }
- });
- }
-
- private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
- final int responseLength = dis.readInt();
- final byte[] responseBuffer = new byte[responseLength];
- dis.readFully(responseBuffer);
- return responseBuffer;
- }
-
- public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
- final String hostname = context.getProperty(HOSTNAME).getValue();
- final int port = context.getProperty(PORT).asInteger();
- final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
- final CommsSession commsSession;
- if (sslContextService == null) {
- commsSession = new StandardCommsSession(hostname, port);
- } else {
- commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
- }
-
- commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
- return commsSession;
- }
-
- private CommsSession leaseCommsSession() throws IOException {
- CommsSession session = queue.poll();
- if (session != null && !session.isClosed()) {
- return session;
- }
-
- session = createCommsSession(configContext);
- final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
- try {
- ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
- } catch (final HandshakeException e) {
- try {
- session.close();
- } catch (final IOException ioe) {
- }
-
- throw new IOException(e);
- }
-
- return session;
- }
-
- @Override
- public void close() throws IOException {
- this.closed = true;
-
- CommsSession commsSession;
- while ((commsSession = queue.poll()) != null) {
- try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
- dos.writeUTF("close");
- dos.flush();
- commsSession.close();
- } catch (final IOException e) {
- }
- }
- logger.info("Closed {}", new Object[] { getIdentifier() });
- }
-
- @Override
- protected void finalize() throws Throwable {
- if (!closed)
- close();
- logger.debug("Finalize called");
- }
-
- private <T> void serialize(final T value, final Serializer<T> serializer, final DataOutputStream dos) throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serializer.serialize(value, baos);
- dos.writeInt(baos.size());
- baos.writeTo(dos);
- }
-
- private <T> T withCommsSession(final CommsAction<T> action) throws IOException {
- if (closed) {
- throw new IllegalStateException("Client is closed");
- }
-
- final CommsSession session = leaseCommsSession();
- try {
- return action.execute(session);
- } catch (final IOException ioe) {
- try {
- session.close();
- } catch (final IOException ignored) {
- }
-
- throw ioe;
- } finally {
- if (!session.isClosed()) {
- if (this.closed) {
- try {
- session.close();
- } catch (final IOException ioe) {
- }
- } else {
- queue.offer(session);
- }
- }
- }
- }
-
- private static interface CommsAction<T> {
- T execute(CommsSession commsSession) throws IOException;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
deleted file mode 100644
index 1d7c94c..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedSetCacheClientService.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.client;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.annotation.OnConfigured;
-import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
-import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.io.ByteArrayOutputStream;
-import org.apache.nifi.io.DataOutputStream;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.remote.StandardVersionNegotiator;
-import org.apache.nifi.remote.VersionNegotiator;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.ssl.SSLContextService.ClientAuth;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DistributedSetCacheClientService extends AbstractControllerService implements DistributedSetCacheClient {
-
- private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
-
- public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
- .name("Server Hostname")
- .description("The name of the server that is running the DistributedSetCacheServer service")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
- public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
- .name("Server Port")
- .description("The port on the remote server that is to be used when communicating with the DistributedSetCacheServer service")
- .required(true)
- .addValidator(StandardValidators.PORT_VALIDATOR)
- .defaultValue("4557")
- .build();
- public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
- .name("SSL Context Service")
- .description(
- "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
- .required(false)
- .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
- .defaultValue(null)
- .build();
- public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
- .name("Communications Timeout")
- .description(
- "Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
- .required(true)
- .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("30 secs")
- .build();
-
- private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
- private volatile ConfigurationContext configContext;
- private volatile boolean closed = false;
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- final List<PropertyDescriptor> descriptors = new ArrayList<>();
- descriptors.add(HOSTNAME);
- descriptors.add(PORT);
- descriptors.add(SSL_CONTEXT_SERVICE);
- descriptors.add(COMMUNICATIONS_TIMEOUT);
- return descriptors;
- }
-
- @OnConfigured
- public void onConfigured(final ConfigurationContext context) {
- this.configContext = context;
- }
-
- public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
- final String hostname = context.getProperty(HOSTNAME).getValue();
- final int port = context.getProperty(PORT).asInteger();
- final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
- final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-
- final CommsSession commsSession;
- if (sslContextService == null) {
- commsSession = new StandardCommsSession(hostname, port);
- } else {
- commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
- }
-
- commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
- return commsSession;
- }
-
- private CommsSession leaseCommsSession() throws IOException {
- CommsSession session = queue.poll();
- if (session != null && !session.isClosed()) {
- return session;
- }
-
- session = createCommsSession(configContext);
- final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
- try {
- ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
- } catch (final HandshakeException e) {
- try {
- session.close();
- } catch (final IOException ioe) {
- }
-
- throw new IOException(e);
- }
-
- return session;
- }
-
- @Override
- public <T> boolean addIfAbsent(final T value, final Serializer<T> serializer) throws IOException {
- return invokeRemoteBoolean("addIfAbsent", value, serializer);
- }
-
- @Override
- public <T> boolean contains(final T value, final Serializer<T> serializer) throws IOException {
- return invokeRemoteBoolean("contains", value, serializer);
- }
-
- @Override
- public <T> boolean remove(final T value, final Serializer<T> serializer) throws IOException {
- return invokeRemoteBoolean("remove", value, serializer);
- }
-
- @Override
- public void close() throws IOException {
- this.closed = true;
-
- CommsSession commsSession;
- while ((commsSession = queue.poll()) != null) {
- try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
- dos.writeUTF("close");
- dos.flush();
- commsSession.close();
- } catch (final IOException e) {
- }
- }
- logger.info("Closed {}", new Object[] { getIdentifier() });
- }
-
- @Override
- protected void finalize() throws Throwable {
- if (!closed)
- close();
- logger.debug("Finalize called");
- }
-
- private <T> boolean invokeRemoteBoolean(final String methodName, final T value, final Serializer<T> serializer) throws IOException {
- if (closed) {
- throw new IllegalStateException("Client is closed");
- }
-
- final CommsSession session = leaseCommsSession();
- try {
- final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
- dos.writeUTF(methodName);
-
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serializer.serialize(value, baos);
- dos.writeInt(baos.size());
- baos.writeTo(dos);
- dos.flush();
-
- final DataInputStream dis = new DataInputStream(session.getInputStream());
- return dis.readBoolean();
- } catch (final IOException ioe) {
- try {
- session.close();
- } catch (final IOException ignored) {
- }
-
- throw ioe;
- } finally {
- if (!session.isClosed()) {
- if (this.closed) {
- try {
- session.close();
- } catch (final IOException ioe) {
- }
- } else {
- queue.offer(session);
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
deleted file mode 100644
index c8be082..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/SSLCommsSession.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.client;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.io.BufferedInputStream;
-import org.apache.nifi.io.BufferedOutputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelOutputStream;
-
-public class SSLCommsSession implements CommsSession {
- private final SSLSocketChannel sslSocketChannel;
- private final SSLContext sslContext;
- private final String hostname;
- private final int port;
-
- private final SSLSocketChannelInputStream in;
- private final BufferedInputStream bufferedIn;
-
- private final SSLSocketChannelOutputStream out;
- private final BufferedOutputStream bufferedOut;
-
- public SSLCommsSession(final SSLContext sslContext, final String hostname, final int port) throws IOException {
- sslSocketChannel = new SSLSocketChannel(sslContext, hostname, port, true);
-
- in = new SSLSocketChannelInputStream(sslSocketChannel);
- bufferedIn = new BufferedInputStream(in);
-
- out = new SSLSocketChannelOutputStream(sslSocketChannel);
- bufferedOut = new BufferedOutputStream(out);
-
- this.sslContext = sslContext;
- this.hostname = hostname;
- this.port = port;
- }
-
- @Override
- public void interrupt() {
- sslSocketChannel.interrupt();
- }
-
- @Override
- public void close() throws IOException {
- sslSocketChannel.close();
- }
-
- @Override
- public void setTimeout(final long value, final TimeUnit timeUnit) {
- sslSocketChannel.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return bufferedIn;
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return bufferedOut;
- }
-
- @Override
- public boolean isClosed() {
- return sslSocketChannel.isClosed();
- }
-
- @Override
- public String getHostname() {
- return hostname;
- }
-
- @Override
- public int getPort() {
- return port;
- }
- @Override
- public SSLContext getSSLContext() {
- return sslContext;
- }
- @Override
- public long getTimeout(final TimeUnit timeUnit) {
- return timeUnit.convert(sslSocketChannel.getTimeout(), TimeUnit.MILLISECONDS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
deleted file mode 100644
index bbe2917..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/StandardCommsSession.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.client;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.nifi.io.BufferedInputStream;
-import org.apache.nifi.io.BufferedOutputStream;
-import org.apache.nifi.remote.io.InterruptableInputStream;
-import org.apache.nifi.remote.io.InterruptableOutputStream;
-import org.apache.nifi.remote.io.socket.SocketChannelInputStream;
-import org.apache.nifi.remote.io.socket.SocketChannelOutputStream;
-
-public class StandardCommsSession implements CommsSession {
- private final SocketChannel socketChannel;
- private final String hostname;
- private final int port;
- private volatile long timeoutMillis;
-
- private final SocketChannelInputStream in;
- private final InterruptableInputStream bufferedIn;
-
- private final SocketChannelOutputStream out;
- private final InterruptableOutputStream bufferedOut;
-
- public StandardCommsSession(final String hostname, final int port) throws IOException {
- socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port));
- socketChannel.configureBlocking(false);
- in = new SocketChannelInputStream(socketChannel);
- bufferedIn = new InterruptableInputStream(new BufferedInputStream(in));
-
- out = new SocketChannelOutputStream(socketChannel);
- bufferedOut = new InterruptableOutputStream(new BufferedOutputStream(out));
-
- this.hostname = hostname;
- this.port = port;
- }
-
- @Override
- public void interrupt() {
- bufferedIn.interrupt();
- bufferedOut.interrupt();
- }
-
- @Override
- public void close() throws IOException {
- socketChannel.close();
- }
-
- @Override
- public void setTimeout(final long value, final TimeUnit timeUnit) {
- in.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
- out.setTimeout((int) TimeUnit.MILLISECONDS.convert(value, timeUnit));
- timeoutMillis = TimeUnit.MILLISECONDS.convert(value, timeUnit);
- }
-
- @Override
- public InputStream getInputStream() throws IOException {
- return bufferedIn;
- }
-
- @Override
- public OutputStream getOutputStream() throws IOException {
- return bufferedOut;
- }
-
- @Override
- public boolean isClosed() {
- boolean closed = !socketChannel.isConnected();
- if (!closed) {
- try {
- this.in.isDataAvailable();
- } catch (IOException e) {
- try {
- close();
- } catch (IOException e1) {
- }
- closed = true;
- }
- }
- return closed;
- }
-
- @Override
- public String getHostname() {
- return hostname;
- }
-
- @Override
- public int getPort() {
- return port;
- }
-
- @Override
- public SSLContext getSSLContext() {
- return null;
- }
-
- @Override
- public long getTimeout(final TimeUnit timeUnit) {
- return timeUnit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
deleted file mode 100644
index a91f7ee..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService
-org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
deleted file mode 100644
index d5f3595..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService/index.html
+++ /dev/null
@@ -1,78 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<head>
-<meta charset="utf-8" />
-<title>Distributed Map Cache Client Service</title>
-<link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
-</head>
-
-<body>
- <h2>Description:</h2>
-
- <p>A Controller Service that can be used to communicate with a
- Distributed Map Cache Server.</p>
-
-
-
- <p>
- <strong>Properties:</strong>
- </p>
- <p>In the list below, the names of required properties appear
- in bold. Any other properties (not in bold) are considered optional.
- If a property has a default value, it is indicated. If a property
- supports the use of the NiFi Expression Language (or simply,
- "expression language"), that is also indicated.</p>
-
- <ul>
- <li><strong>Server Hostname</strong>
- <ul>
- <li>The name of the server that is running the DistributedMapCacheServer service</li>
- <li>Default value: no default</li>
- <li>Supports expression language: false</li>
- </ul></li>
- <li><strong>Server Port</strong>
- <ul>
- <li>The port on the remote server that is to be used when communicating with the
- <a href="../nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">DistributedMapCacheServer</a> service</li>
-
- <li>Default value: 4557</li>
- <li>Supports expression language: false</li>
- </ul></li>
- <li>SSL Context Service
- <ul>
- <li>If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted
- <li>Default value: no default</li>
- <li>Supports expression language: false</li>
- </ul></li>
- <li><strong>Communications Timeout</strong>
- <ul>
- <li>Specifices how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received
- <li>Default value: 30 secs</li>
- <li>Supports expression language: false</li>
- </ul></li>
-
- </ul>
-
-
- <i>See Also:</i>
- <ul>
- <li><a href="../org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer/index.html">Distributed Map Cache Server</a></li>
- <li><a href="../org.apache.nifi.ssl.StandardSSLContextService/index.html">Standard SSL Context Service</a></li>
- </ul>
-
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/resources/docs/org.apache.nifi.distributed.cache.client.DistributedSetCacheClientService/index.html
deleted file mode 100755
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
deleted file mode 100644
index bc612ae..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/pom.xml
+++ /dev/null
@@ -1,39 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>distributed-cache-services-bundle</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>distributed-cache-protocol</artifactId>
- <name>Distributed Cache Protocol</name>
-
- <description>
- Defines the communications protocol that is used between clients and servers
- for the Distributed Cache services
- </description>
-
-
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>remote-communications-utils</artifactId>
- </dependency>
- </dependencies>
-
-</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/19d4a150/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
deleted file mode 100644
index da2acad..0000000
--- a/nar-bundles/distributed-cache-services-bundle/distributed-cache-protocol/src/main/java/org/apache/nifi/distributed/cache/protocol/ProtocolHandshake.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.distributed.cache.protocol;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
-import org.apache.nifi.remote.VersionNegotiator;
-
-public class ProtocolHandshake {
-
- public static final byte[] MAGIC_HEADER = new byte[] { 'N', 'i', 'F', 'i' };
-
- public static final int RESOURCE_OK = 20;
- public static final int DIFFERENT_RESOURCE_VERSION = 21;
- public static final int ABORT = 255;
-
-
- public static void initiateHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
- final DataInputStream dis = new DataInputStream(in);
- final DataOutputStream dos = new DataOutputStream(out);
-
- try {
- dos.write(MAGIC_HEADER);
-
- initiateVersionNegotiation(versionNegotiator, dis, dos);
- } finally {
- dos.flush();
- }
- }
-
-
- public static void receiveHandshake(final InputStream in, final OutputStream out, final VersionNegotiator versionNegotiator) throws IOException, HandshakeException {
- final DataInputStream dis = new DataInputStream(in);
- final DataOutputStream dos = new DataOutputStream(out);
-
- try {
- final byte[] magicHeaderBuffer = new byte[4];
- dis.readFully(magicHeaderBuffer);
-
- receiveVersionNegotiation(versionNegotiator, dis, dos);
- } finally {
- dos.flush();
- }
- }
-
-
- private static void initiateVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
- // Write the classname of the RemoteStreamCodec, followed by its version
- dos.writeInt(negotiator.getVersion());
- dos.flush();
-
- // wait for response from server.
- final int statusCode = dis.read();
- switch (statusCode) {
- case RESOURCE_OK: // server accepted our proposal of codec name/version
- return;
- case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version
- // Get server's preferred version
- final int newVersion = dis.readInt();
-
- // Determine our new preferred version that is no greater than the server's preferred version.
- final Integer newPreference = negotiator.getPreferredVersion(newVersion);
- // If we could not agree with server on a version, fail now.
- if ( newPreference == null ) {
- throw new HandshakeException("Could not agree on protocol version");
- }
-
- negotiator.setVersion(newPreference);
-
- // Attempt negotiation of resource based on our new preferred version.
- initiateVersionNegotiation(negotiator, dis, dos);
- case ABORT:
- throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF());
- default:
- throw new HandshakeException("Received unexpected response code " + statusCode + " when negotiating version with remote server");
- }
- }
-
- private static void receiveVersionNegotiation(final VersionNegotiator negotiator, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException {
- final int version = dis.readInt();
- if ( negotiator.isVersionSupported(version) ) {
- dos.write(RESOURCE_OK);
- dos.flush();
-
- negotiator.setVersion(version);
- } else {
- final Integer preferred = negotiator.getPreferredVersion(version);
- if ( preferred == null ) {
- dos.write(ABORT);
- dos.flush();
- throw new HandshakeException("Unable to negotiate an acceptable version of the Distributed Cache Protocol");
- }
- dos.write(DIFFERENT_RESOURCE_VERSION);
- dos.writeInt(preferred);
- dos.flush();
-
- receiveVersionNegotiation(negotiator, dis, dos);
- }
- }
-}