You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:58 UTC
[35/51] [partial] incubator-nifi git commit: Initial code contribution
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java b/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java
new file mode 100644
index 0000000..55f59ba
--- /dev/null
+++ b/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import org.apache.nifi.authorization.annotation.AuthorityProviderContext;
+import org.apache.nifi.authorization.exception.AuthorityAccessException;
+import org.apache.nifi.authorization.exception.IdentityAlreadyExistsException;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.authorization.exception.UnknownIdentityException;
+import org.apache.nifi.file.FileUtils;
+import org.apache.nifi.user.generated.ObjectFactory;
+import org.apache.nifi.user.generated.Role;
+import org.apache.nifi.user.generated.User;
+import org.apache.nifi.user.generated.Users;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+/**
+ * Provides identity checks and grants authorities.
+ */
+public class FileAuthorizationProvider implements AuthorityProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileAuthorizationProvider.class);
+ private static final String USERS_XSD = "/users.xsd";
+ private static final String JAXB_GENERATED_PATH = "org.apache.nifi.user.generated";
+ private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+ /**
+ * Load the JAXBContext.
+ */
+ private static JAXBContext initializeJaxbContext() {
+ try {
+ return JAXBContext.newInstance(JAXB_GENERATED_PATH, FileAuthorizationProvider.class.getClassLoader());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.");
+ }
+ }
+
+ private NiFiProperties properties;
+ private File usersFile;
+ private File restoreUsersFile;
+ private Users users;
+ private final Set<String> defaultAuthorities = new HashSet<>();
+
+ @Override
+ public void initialize(final AuthorityProviderInitializationContext initializationContext) throws ProviderCreationException {
+ }
+
+ @Override
+ public void onConfigured(final AuthorityProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ try {
+ final String usersFilePath = configurationContext.getProperty("Authorized Users File");
+ if (usersFilePath == null || usersFilePath.trim().isEmpty()) {
+ throw new ProviderCreationException("The authorized users file must be specified.");
+ }
+
+ // the users file instance will never be null because a default is used
+ usersFile = new File(usersFilePath);
+ final File usersFileDirectory = usersFile.getParentFile();
+
+ // the restore directory is optional and may be null
+ final File restoreDirectory = properties.getRestoreDirectory();
+
+ if (restoreDirectory != null) {
+
+ // sanity check that restore directory is a directory, creating it if necessary
+ FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
+
+ // check that restore directory is not the same as the primary directory
+ if (usersFileDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
+ throw new ProviderCreationException(String.format("Authorized User's directory '%s' is the same as restore directory '%s' ",
+ usersFileDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
+ }
+
+ // the restore copy will have same file name, but reside in a different directory
+ restoreUsersFile = new File(restoreDirectory, usersFile.getName());
+
+ // sync the primary copy with the restore copy
+ try {
+ FileUtils.syncWithRestore(usersFile, restoreUsersFile, logger);
+ } catch (final IOException | IllegalStateException ioe) {
+ throw new ProviderCreationException(ioe);
+ }
+
+ }
+
+ // load the users from the specified file
+ if (usersFile.exists()) {
+ // find the schema
+ final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ final Schema schema = schemaFactory.newSchema(FileAuthorizationProvider.class.getResource(USERS_XSD));
+
+ // attempt to unmarshal
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ unmarshaller.setSchema(schema);
+ final JAXBElement<Users> element = unmarshaller.unmarshal(new StreamSource(usersFile), Users.class);
+ users = element.getValue();
+ } else {
+ final ObjectFactory objFactory = new ObjectFactory();
+ users = objFactory.createUsers();
+ }
+
+ // attempt to load a default roles
+ final String rawDefaultAuthorities = configurationContext.getProperty("Default User Roles");
+ if (StringUtils.isNotBlank(rawDefaultAuthorities)) {
+ final Set<String> invalidDefaultAuthorities = new HashSet<>();
+
+ // validate the specified authorities
+ final String[] rawDefaultAuthorityList = rawDefaultAuthorities.split(",");
+ for (String rawAuthority : rawDefaultAuthorityList) {
+ rawAuthority = rawAuthority.trim();
+ final Authority authority = Authority.valueOfAuthority(rawAuthority);
+ if (authority == null) {
+ invalidDefaultAuthorities.add(rawAuthority);
+ } else {
+ defaultAuthorities.add(rawAuthority);
+ }
+ }
+
+ // report any unrecognized authorities
+ if (!invalidDefaultAuthorities.isEmpty()) {
+ logger.warn(String.format("The following default role(s) '%s' were not recognized. Possible values: %s.",
+ StringUtils.join(invalidDefaultAuthorities, ", "), StringUtils.join(Authority.getRawAuthorities(), ", ")));
+ }
+ }
+ } catch (IOException | ProviderCreationException | SAXException | JAXBException e) {
+ throw new ProviderCreationException(e);
+ }
+
+ }
+
+ @Override
+ public void preDestruction() {
+ }
+
+ /**
+ * Determines if this provider has a default role.
+ *
+ * @return
+ */
+ private boolean hasDefaultRoles() {
+ return !defaultAuthorities.isEmpty();
+ }
+
+ /**
+ * Determines if the specified dn is known to this authority provider. When
+ * this provider is configured to have default role(s), all dn are
+ * considered to exist.
+ *
+ * @param dn
+ * @return True if he dn is known, false otherwise
+ */
+ @Override
+ public boolean doesDnExist(String dn) throws AuthorityAccessException {
+ if (hasDefaultRoles()) {
+ return true;
+ }
+
+ final User user = getUser(dn);
+ return user != null;
+ }
+
+ /**
+ * Loads the authorities for the specified user. If this provider is
+ * configured for default user role(s) and a non existent dn is specified, a
+ * new user will be automatically created with the default role(s).
+ *
+ * @param dn
+ * @return
+ * @throws UnknownIdentityException
+ * @throws AuthorityAccessException
+ */
+ @Override
+ public synchronized Set<Authority> getAuthorities(String dn) throws UnknownIdentityException, AuthorityAccessException {
+ final Set<Authority> authorities = EnumSet.noneOf(Authority.class);
+
+ // get the user
+ final User user = getUser(dn);
+
+ // ensure the user was located
+ if (user == null) {
+ if (hasDefaultRoles()) {
+ logger.debug(String.format("User DN not found: %s. Creating new user with default roles.", dn));
+
+ // create the user (which will automatically add any default authorities)
+ addUser(dn, null);
+
+ // get the authorities for the newly created user
+ authorities.addAll(getAuthorities(dn));
+ } else {
+ throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+ }
+ } else {
+ // create the authorities that this user has
+ for (final Role role : user.getRole()) {
+ authorities.add(Authority.valueOfAuthority(role.getName()));
+ }
+ }
+
+ return authorities;
+ }
+
+ /**
+ * Adds the specified authorities to the specified user. Regardless of
+ * whether this provider is configured for a default user role, when a non
+ * existent dn is specified, an UnknownIdentityException will be thrown.
+ *
+ * @param dn
+ * @param authorities
+ * @throws UnknownIdentityException
+ * @throws AuthorityAccessException
+ */
+ @Override
+ public synchronized void setAuthorities(String dn, Set<Authority> authorities) throws UnknownIdentityException, AuthorityAccessException {
+ // get the user
+ final User user = getUser(dn);
+
+ // ensure the user was located
+ if (user == null) {
+ throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+ }
+
+ // add the user authorities
+ setUserAuthorities(user, authorities);
+
+ try {
+ // save the file
+ save();
+ } catch (Exception e) {
+ throw new AuthorityAccessException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Adds the specified authorities to the specified user.
+ *
+ * @param user
+ * @param authorities
+ */
+ private void setUserAuthorities(final User user, final Set<Authority> authorities) {
+ // clear the existing rules
+ user.getRole().clear();
+
+ // set the new roles
+ final ObjectFactory objFactory = new ObjectFactory();
+ for (final Authority authority : authorities) {
+ final Role role = objFactory.createRole();
+ role.setName(authority.toString());
+
+ // add the new role
+ user.getRole().add(role);
+ }
+ }
+
+ /**
+ * Adds the specified user. If this provider is configured with default
+ * role(s) they will be added to the new user.
+ *
+ * @param dn
+ * @param group
+ * @throws UnknownIdentityException
+ * @throws AuthorityAccessException
+ */
+ @Override
+ public synchronized void addUser(String dn, String group) throws IdentityAlreadyExistsException, AuthorityAccessException {
+ final User user = getUser(dn);
+
+ // ensure the user doesn't already exist
+ if (user != null) {
+ throw new IdentityAlreadyExistsException(String.format("User DN already exists: %s", dn));
+ }
+
+ // create the new user
+ final ObjectFactory objFactory = new ObjectFactory();
+ final User newUser = objFactory.createUser();
+
+ // set the user properties
+ newUser.setDn(dn);
+ newUser.setGroup(group);
+
+ // add default roles if appropriate
+ if (hasDefaultRoles()) {
+ for (final String authority : defaultAuthorities) {
+ Role role = objFactory.createRole();
+ role.setName(authority);
+
+ // add the role
+ newUser.getRole().add(role);
+ }
+ }
+
+ // add the user
+ users.getUser().add(newUser);
+
+ try {
+ // save the file
+ save();
+ } catch (Exception e) {
+ throw new AuthorityAccessException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Gets the users for the specified authority.
+ *
+ * @param authority
+ * @return
+ * @throws AuthorityAccessException
+ */
+ @Override
+ public synchronized Set<String> getUsers(Authority authority) throws AuthorityAccessException {
+ final Set<String> userSet = new HashSet<>();
+ for (final User user : users.getUser()) {
+ for (final Role role : user.getRole()) {
+ if (role.getName().equals(authority.toString())) {
+ userSet.add(user.getDn());
+ }
+ }
+ }
+ return userSet;
+ }
+
+ /**
+ * Removes the specified user. Regardless of whether this provider is
+ * configured for a default user role, when a non existent dn is specified,
+ * an UnknownIdentityException will be thrown.
+ *
+ * @param dn
+ * @throws UnknownIdentityException
+ * @throws AuthorityAccessException
+ */
+ @Override
+ public synchronized void revokeUser(String dn) throws UnknownIdentityException, AuthorityAccessException {
+ // get the user
+ final User user = getUser(dn);
+
+ // ensure the user was located
+ if (user == null) {
+ throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+ }
+
+ // remove the specified user
+ users.getUser().remove(user);
+
+ try {
+ // save the file
+ save();
+ } catch (Exception e) {
+ throw new AuthorityAccessException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void setUsersGroup(Set<String> dns, String group) throws UnknownIdentityException, AuthorityAccessException {
+ final Collection<User> groupedUsers = new HashSet<>();
+
+ // get the specified users
+ for (final String dn : dns) {
+ // get the user
+ final User user = getUser(dn);
+
+ // ensure the user was located
+ if (user == null) {
+ throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+ }
+
+ groupedUsers.add(user);
+ }
+
+ // update each user group
+ for (final User user : groupedUsers) {
+ user.setGroup(group);
+ }
+
+ try {
+ // save the file
+ save();
+ } catch (Exception e) {
+ throw new AuthorityAccessException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void ungroupUser(String dn) throws UnknownIdentityException, AuthorityAccessException {
+ // get the user
+ final User user = getUser(dn);
+
+ // ensure the user was located
+ if (user == null) {
+ throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+ }
+
+ // remove the users group
+ user.setGroup(null);
+
+ try {
+ // save the file
+ save();
+ } catch (Exception e) {
+ throw new AuthorityAccessException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void ungroup(String group) throws AuthorityAccessException {
+ // get the user group
+ final Collection<User> userGroup = getUserGroup(group);
+
+ // ensure the user group was located
+ if (userGroup == null) {
+ return;
+ }
+
+ // update each user group
+ for (final User user : userGroup) {
+ user.setGroup(null);
+ }
+
+ try {
+ // save the file
+ save();
+ } catch (Exception e) {
+ throw new AuthorityAccessException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public String getGroupForUser(String dn) throws UnknownIdentityException, AuthorityAccessException {
+ // get the user
+ final User user = getUser(dn);
+
+ // ensure the user was located
+ if (user == null) {
+ throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+ }
+
+ return user.getGroup();
+ }
+
+ @Override
+ public void revokeGroup(String group) throws UnknownIdentityException, AuthorityAccessException {
+ // get the user group
+ final Collection<User> userGroup = getUserGroup(group);
+
+ // ensure the user group was located
+ if (userGroup == null) {
+ throw new UnknownIdentityException(String.format("User group not found: %s.", group));
+ }
+
+ // remove each user in the group
+ for (final User user : userGroup) {
+ users.getUser().remove(user);
+ }
+
+ try {
+ // save the file
+ save();
+ } catch (Exception e) {
+ throw new AuthorityAccessException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Locates the user with the specified DN.
+ *
+ * @param dn
+ * @return
+ */
+ private User getUser(String dn) throws UnknownIdentityException {
+ // ensure the DN was specified
+ if (dn == null) {
+ throw new UnknownIdentityException("User DN not specified.");
+ }
+
+ // attempt to get the user and ensure it was located
+ User desiredUser = null;
+ for (final User user : users.getUser()) {
+ if (dn.equalsIgnoreCase(user.getDn())) {
+ desiredUser = user;
+ break;
+ }
+ }
+
+ return desiredUser;
+ }
+
+ /**
+ * Locates all users that are part of the specified group.
+ *
+ * @param group
+ * @return
+ * @throws UnknownIdentityException
+ */
+ private Collection<User> getUserGroup(String group) throws UnknownIdentityException {
+ // ensure the DN was specified
+ if (group == null) {
+ throw new UnknownIdentityException("User group not specified.");
+ }
+
+ // get all users with this group
+ Collection<User> userGroup = null;
+ for (final User user : users.getUser()) {
+ if (group.equals(user.getGroup())) {
+ if (userGroup == null) {
+ userGroup = new HashSet<>();
+ }
+ userGroup.add(user);
+ }
+ }
+
+ return userGroup;
+ }
+
+ /**
+ * Saves the users file.
+ *
+ * @throws Exception
+ */
+ private void save() throws Exception {
+ final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+
+ // save users to restore directory before primary directory
+ if (restoreUsersFile != null) {
+ marshaller.marshal(users, restoreUsersFile);
+ }
+
+ // save users to primary directory
+ marshaller.marshal(users, usersFile);
+ }
+
+ @AuthorityProviderContext
+ public void setNiFiProperties(NiFiProperties properties) {
+ this.properties = properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider b/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
new file mode 100755
index 0000000..93d2941
--- /dev/null
+++ b/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.authorization.FileAuthorizationProvider
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/xsd/users.xsd
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/src/main/xsd/users.xsd b/extensions/file-authorization-provider/src/main/xsd/users.xsd
new file mode 100644
index 0000000..4ee1e17
--- /dev/null
+++ b/extensions/file-authorization-provider/src/main/xsd/users.xsd
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
+ <!-- role -->
+ <xs:complexType name="Role">
+ <xs:attribute name="name">
+ <xs:simpleType>
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="ROLE_MONITOR"/>
+ <xs:enumeration value="ROLE_PROVENANCE"/>
+ <xs:enumeration value="ROLE_DFM"/>
+ <xs:enumeration value="ROLE_ADMIN"/>
+ <xs:enumeration value="ROLE_PROXY"/>
+ <xs:enumeration value="ROLE_NIFI"/>
+ </xs:restriction>
+ </xs:simpleType>
+ </xs:attribute>
+ </xs:complexType>
+
+ <!-- user -->
+ <xs:complexType name="User">
+ <xs:sequence>
+ <xs:element name="role" type="Role" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ <xs:attribute name="dn">
+ <xs:simpleType>
+ <xs:restriction base="xs:string">
+ <xs:minLength value="1"/>
+ <xs:pattern value=".*[^\s].*"/>
+ </xs:restriction>
+ </xs:simpleType>
+ </xs:attribute>
+ <xs:attribute name="group">
+ <xs:simpleType>
+ <xs:restriction base="xs:string">
+ <xs:minLength value="1"/>
+ <xs:pattern value=".*[^\s].*"/>
+ </xs:restriction>
+ </xs:simpleType>
+ </xs:attribute>
+ </xs:complexType>
+
+ <!-- users -->
+ <xs:element name="users">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="user" type="User" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+</xs:schema>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java b/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java
new file mode 100644
index 0000000..3d0196d
--- /dev/null
+++ b/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.file.FileUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+import org.junit.Ignore;
+import org.mockito.Mockito;
+
+@Ignore
+public class FileAuthorizationProviderTest {
+
+ private FileAuthorizationProvider provider;
+
+ private File primary;
+
+ private File restore;
+
+ private NiFiProperties mockProperties;
+
+ private AuthorityProviderConfigurationContext mockConfigurationContext;
+
+ @Before
+ public void setup() throws IOException {
+
+ primary = new File("target/primary/users.txt");
+ restore = new File("target/restore/users.txt");
+
+ System.out.println("absolute path: " + primary.getAbsolutePath());
+
+ mockProperties = mock(NiFiProperties.class);
+ when(mockProperties.getRestoreDirectory()).thenReturn(restore.getParentFile());
+
+ mockConfigurationContext = mock(AuthorityProviderConfigurationContext.class);
+ when(mockConfigurationContext.getProperty(Mockito.eq("Authorized Users File"))).thenReturn(primary.getPath());
+
+ provider = new FileAuthorizationProvider();
+ provider.setNiFiProperties(mockProperties);
+ provider.initialize(null);
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ deleteFile(primary);
+ deleteFile(restore);
+ }
+
+ private boolean deleteFile(final File file) {
+ if(file.isDirectory()) {
+ FileUtils.deleteFilesInDir(file, null, null, true, true);
+ }
+ return FileUtils.deleteFile(file, null, 10);
+ }
+
+ @Test
+ public void testPostContructionWhenRestoreDoesNotExist() throws Exception {
+
+ byte[] primaryBytes = "<users/>".getBytes();
+ FileOutputStream fos = new FileOutputStream(primary);
+ fos.write(primaryBytes);
+ fos.close();
+
+ provider.onConfigured(mockConfigurationContext);
+ assertEquals(primary.length(), restore.length());
+ }
+
+ @Test
+ public void testPostContructionWhenPrimaryDoesNotExist() throws Exception {
+
+ byte[] restoreBytes = "<users/>".getBytes();
+ FileOutputStream fos = new FileOutputStream(restore);
+ fos.write(restoreBytes);
+ fos.close();
+
+ provider.onConfigured(mockConfigurationContext);
+ assertEquals(restore.length(), primary.length());
+
+ }
+
+ @Test(expected = ProviderCreationException.class)
+ public void testPostContructionWhenPrimaryDifferentThanRestore() throws Exception {
+
+ byte[] primaryBytes = "<users></users>".getBytes();
+ FileOutputStream fos = new FileOutputStream(primary);
+ fos.write(primaryBytes);
+ fos.close();
+
+ byte[] restoreBytes = "<users/>".getBytes();
+ fos = new FileOutputStream(restore);
+ fos.write(restoreBytes);
+ fos.close();
+
+ provider.onConfigured(mockConfigurationContext);
+ }
+
+ @Test
+ public void testPostContructionWhenPrimaryAndBackupDoNotExist() throws Exception {
+
+ provider.onConfigured(mockConfigurationContext);
+ assertEquals(0, restore.length());
+ assertEquals(restore.length(), primary.length());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/build-order.sh
----------------------------------------------------------------------
diff --git a/misc/build-order.sh b/misc/build-order.sh
new file mode 100755
index 0000000..855321a
--- /dev/null
+++ b/misc/build-order.sh
@@ -0,0 +1,79 @@
+#MAVEN_FLAGS="-Dmaven.test.skip=true"
+MAVEN_FLAGS=""
+
+cd misc/nar-maven-plugin && \
+mvn $MAVEN_FLAGS install && \
+cd ../../commons/nifi-parent && \
+mvn $MAVEN_FLAGS install && \
+cd ../../nifi-api && \
+mvn $MAVEN_FLAGS install && \
+cd ../commons/ && \
+cd nifi-stream-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../wali && \
+mvn $MAVEN_FLAGS install && \
+cd ../flowfile-packager && \
+mvn $MAVEN_FLAGS install && \
+cd ../core-flowfile-attributes && \
+mvn $MAVEN_FLAGS install && \
+cd ../data-provenance-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../naive-search-ring-buffer && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-expression-language && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-file-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-logging-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-properties && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-security-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-socket-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../nifi-web-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../processor-utilities && \
+mvn $MAVEN_FLAGS install && \
+cd ../remote-communications-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../search-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../../extensions/file-authorization-provider && \
+mvn $MAVEN_FLAGS install && \
+cd ../../nifi-mock && \
+mvn $MAVEN_FLAGS install && \
+cd ../nar-bundles/ && \
+cd nar-container-common && \
+mvn $MAVEN_FLAGS install && \
+cd ../jetty-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../standard-services-api-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../ssl-context-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../distributed-cache-services-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../standard-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../hadoop-libraries-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../hadoop-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../volatile-provenance-repository-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../persistent-provenance-repository-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../framework-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../execute-script-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../monitor-threshold-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../update-attribute-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../../assemblies/nifi
+mvn assembly:assembly
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/misc/nar-maven-plugin/pom.xml b/misc/nar-maven-plugin/pom.xml
new file mode 100644
index 0000000..3888df3
--- /dev/null
+++ b/misc/nar-maven-plugin/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nar-maven-plugin</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <packaging>maven-plugin</packaging>
+ <name>Apache NiFi NAR Plugin</name>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+ <build>
+ <defaultGoal>install</defaultGoal>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-plugin-plugin</artifactId>
+ <version>3.3</version>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-plugin-api</artifactId>
+ <version>2.0.11</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.9</version>
+ <type>maven-plugin</type>
+ </dependency>
+ <dependency>
+ <!-- No code from maven-jar-plugin is actually used; it's included
+ just to simplify the dependencies list. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.maven.plugin-tools</groupId>
+ <artifactId>maven-plugin-annotations</artifactId>
+ <version>3.3</version>
+ </dependency>
+ </dependencies>
+ <distributionManagement>
+ <repository>
+ <id>nifi-releases</id>
+ <url>${nifi.repo.url}</url>
+ </repository>
+ <snapshotRepository>
+ <id>nifi-snapshots</id>
+ <url>${nifi.snapshot.repo.url}</url>
+ </snapshotRepository>
+ </distributionManagement>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
----------------------------------------------------------------------
diff --git a/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
new file mode 100644
index 0000000..263fe88
--- /dev/null
+++ b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nifi;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.maven.archiver.MavenArchiveConfiguration;
+import org.apache.maven.archiver.MavenArchiver;
+import org.apache.maven.artifact.Artifact;
+import org.apache.maven.artifact.DependencyResolutionRequiredException;
+import org.apache.maven.artifact.factory.ArtifactFactory;
+import org.apache.maven.artifact.installer.ArtifactInstaller;
+import org.apache.maven.artifact.metadata.ArtifactMetadataSource;
+import org.apache.maven.artifact.repository.ArtifactRepository;
+import org.apache.maven.artifact.repository.ArtifactRepositoryFactory;
+import org.apache.maven.artifact.resolver.ArtifactCollector;
+import org.apache.maven.artifact.resolver.ArtifactNotFoundException;
+import org.apache.maven.artifact.resolver.ArtifactResolutionException;
+import org.apache.maven.artifact.resolver.ArtifactResolver;
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugin.MojoFailureException;
+import org.apache.maven.plugin.dependency.utils.DependencyStatusSets;
+import org.apache.maven.plugin.dependency.utils.DependencyUtil;
+import org.apache.maven.plugin.dependency.utils.filters.DestFileFilter;
+import org.apache.maven.plugin.dependency.utils.resolvers.ArtifactsResolver;
+import org.apache.maven.plugin.dependency.utils.resolvers.DefaultArtifactsResolver;
+import org.apache.maven.plugin.dependency.utils.translators.ArtifactTranslator;
+import org.apache.maven.plugin.dependency.utils.translators.ClassifierTypeTranslator;
+import org.apache.maven.plugins.annotations.LifecyclePhase;
+import org.apache.maven.plugins.annotations.Mojo;
+import org.apache.maven.plugins.annotations.Parameter;
+import org.apache.maven.plugins.annotations.ResolutionScope;
+import org.apache.maven.project.MavenProject;
+import org.apache.maven.execution.MavenSession;
+import org.apache.maven.plugins.annotations.Component;
+import org.apache.maven.project.MavenProjectHelper;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactFilterException;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactIdFilter;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactsFilter;
+import org.apache.maven.shared.artifact.filter.collection.ClassifierFilter;
+import org.apache.maven.shared.artifact.filter.collection.FilterArtifacts;
+import org.apache.maven.shared.artifact.filter.collection.GroupIdFilter;
+import org.apache.maven.shared.artifact.filter.collection.ScopeFilter;
+import org.apache.maven.shared.artifact.filter.collection.ProjectTransitivityFilter;
+import org.apache.maven.shared.artifact.filter.collection.TypeFilter;
+import org.codehaus.plexus.archiver.ArchiverException;
+import org.codehaus.plexus.archiver.jar.JarArchiver;
+import org.codehaus.plexus.archiver.jar.ManifestException;
+import org.codehaus.plexus.archiver.manager.ArchiverManager;
+import org.codehaus.plexus.util.FileUtils;
+import org.codehaus.plexus.util.StringUtils;
+
+/**
+ * Packages the current project as an Apache NiFi Archive (NAR).
+ *
+ * The following code is derived from maven-dependencies-plugin and
+ * maven-jar-plugin. The functionality of CopyDependenciesMojo and JarMojo was
+ * simplified to the use case of NarMojo.
+ *
+ */
+@Mojo(name = "nar", defaultPhase = LifecyclePhase.PACKAGE, threadSafe = false, requiresDependencyResolution = ResolutionScope.RUNTIME)
+public class NarMojo extends AbstractMojo {
+
+ private static final String[] DEFAULT_EXCLUDES = new String[]{"**/package.html"};
+ private static final String[] DEFAULT_INCLUDES = new String[]{"**/**"};
+
+ /**
+ * POM
+ *
+ */
+ @Parameter(property = "project", readonly = true, required = true)
+ protected MavenProject project;
+
+ @Parameter(property = "session", readonly = true, required = true)
+ protected MavenSession session;
+
+ /**
+ * List of files to include. Specified as fileset patterns.
+ */
+ @Parameter(property = "includes")
+ protected String[] includes;
+ /**
+ * List of files to exclude. Specified as fileset patterns.
+ */
+ @Parameter(property = "excludes")
+ protected String[] excludes;
+ /**
+ * Name of the generated NAR.
+ *
+ */
+ @Parameter(alias = "narName", property = "nar.finalName", defaultValue = "${project.build.finalName}", required = true)
+ protected String finalName;
+
+ /**
+ * The Jar archiver.
+ *
+ * \@\component role="org.codehaus.plexus.archiver.Archiver" roleHint="jar"
+ */
+ @Component(role = org.codehaus.plexus.archiver.Archiver.class, hint = "jar")
+ private JarArchiver jarArchiver;
+ /**
+ * The archive configuration to use.
+ *
+ * See <a
+ * href="http://maven.apache.org/shared/maven-archiver/index.html">the
+ * documentation for Maven Archiver</a>.
+ *
+ */
+ @Parameter(property = "archive")
+ protected final MavenArchiveConfiguration archive = new MavenArchiveConfiguration();
+ /**
+ * Path to the default MANIFEST file to use. It will be used if
+ * <code>useDefaultManifestFile</code> is set to <code>true</code>.
+ *
+ */
+ @Parameter(property = "defaultManifestFiles", defaultValue = "${project.build.outputDirectory}/META-INF/MANIFEST.MF", readonly = true, required = true)
+ protected File defaultManifestFile;
+
+ /**
+ * Set this to <code>true</code> to enable the use of the
+ * <code>defaultManifestFile</code>.
+ *
+ * @since 2.2
+ */
+ @Parameter(property = "nar.useDefaultManifestFile", defaultValue = "false")
+ protected boolean useDefaultManifestFile;
+
+ @Component
+ protected MavenProjectHelper projectHelper;
+
+ /**
+ * Whether creating the archive should be forced.
+ *
+ */
+ @Parameter(property = "nar.forceCreation", defaultValue = "false")
+ protected boolean forceCreation;
+
+ /**
+ * Classifier to add to the artifact generated. If given, the artifact will
+ * be an attachment instead.
+ *
+ */
+ @Parameter(property = "classifier")
+ protected String classifier;
+
+ @Component
+ protected ArtifactInstaller installer;
+
+ @Component
+ protected ArtifactRepositoryFactory repositoryFactory;
+
+ /**
+ * This only applies if the classifier parameter is used.
+ *
+ */
+ @Parameter(property = "mdep.failOnMissingClassifierArtifact", defaultValue = "true", required = false)
+ protected boolean failOnMissingClassifierArtifact = true;
+
+ /**
+ * Comma Separated list of Types to include. Empty String indicates include
+ * everything (default).
+ *
+ */
+ @Parameter(property = "includeTypes", required = false)
+ protected String includeTypes;
+
+ /**
+ * Comma Separated list of Types to exclude. Empty String indicates don't
+ * exclude anything (default).
+ *
+ */
+ @Parameter(property = "excludeTypes", required = false)
+ protected String excludeTypes;
+
+ /**
+ * Scope to include. An Empty string indicates all scopes (default).
+ *
+ */
+ @Parameter(property = "includeScope", required = false)
+ protected String includeScope;
+
+ /**
+ * Scope to exclude. An Empty string indicates no scopes (default).
+ *
+ */
+ @Parameter(property = "excludeScope", required = false)
+ protected String excludeScope;
+
+ /**
+ * Comma Separated list of Classifiers to include. Empty String indicates
+ * include everything (default).
+ *
+ */
+ @Parameter(property = "includeClassifiers", required = false)
+ protected String includeClassifiers;
+
+ /**
+ * Comma Separated list of Classifiers to exclude. Empty String indicates
+ * don't exclude anything (default).
+ *
+ */
+ @Parameter(property = "excludeClassifiers", required = false)
+ protected String excludeClassifiers;
+
+ /**
+ * Specify classifier to look for. Example: sources
+ *
+ */
+ @Parameter(property = "classifier", required = false)
+ protected String copyDepClassifier;
+
+ /**
+ * Specify type to look for when constructing artifact based on classifier.
+ * Example: java-source,jar,war, nar
+ *
+ */
+ @Parameter(property = "type", required = false, defaultValue = "nar")
+ protected String type;
+
+ /**
+ * Comma separated list of Artifact names too exclude.
+ *
+ */
+ @Parameter(property = "excludeArtifacts", required = false)
+ protected String excludeArtifactIds;
+
+ /**
+ * Comma separated list of Artifact names to include.
+ *
+ */
+ @Parameter(property = "includeArtifacts", required = false)
+ protected String includeArtifactIds;
+
+ /**
+ * Comma separated list of GroupId Names to exclude.
+ *
+ */
+ @Parameter(property = "excludeArtifacts", required = false)
+ protected String excludeGroupIds;
+
+ /**
+ * Comma separated list of GroupIds to include.
+ *
+ */
+ @Parameter(property = "includeGroupIds", required = false)
+ protected String includeGroupIds;
+
+ /**
+ * Directory to store flag files
+ *
+ */
+ @Parameter(property = "markersDirectory", required = false, defaultValue = "${project.build.directory}/dependency-maven-plugin-markers")
+ protected File markersDirectory;
+
+ /**
+ * Overwrite release artifacts
+ *
+ */
+ @Parameter(property = "overWriteReleases", required = false)
+ protected boolean overWriteReleases;
+
+ /**
+ * Overwrite snapshot artifacts
+ *
+ */
+ @Parameter(property = "overWriteSnapshots", required = false)
+ protected boolean overWriteSnapshots;
+
+ /**
+ * Overwrite artifacts that don't exist or are older than the source.
+ *
+ */
+ @Parameter(property = "overWriteIfNewer", required = false, defaultValue = "true")
+ protected boolean overWriteIfNewer;
+
+ /**
+ * Used to look up Artifacts in the remote repository.
+ */
+ @Component
+ protected ArtifactFactory factory;
+
+ /**
+ * Used to look up Artifacts in the remote repository.
+ *
+ */
+ @Component
+ protected ArtifactResolver resolver;
+
+ /**
+ * Artifact collector, needed to resolve dependencies.
+ *
+ */
+ @Component(role = org.apache.maven.artifact.resolver.ArtifactCollector.class)
+ protected ArtifactCollector artifactCollector;
+
+ @Component(role = org.apache.maven.artifact.metadata.ArtifactMetadataSource.class)
+ protected ArtifactMetadataSource artifactMetadataSource;
+
+ /**
+ * Location of the local repository.
+ *
+ */
+ @Parameter(property = "localRepository", required = true, readonly = true)
+ protected ArtifactRepository local;
+
+ /**
+ * List of Remote Repositories used by the resolver
+ *
+ */
+ @Parameter(property = "project.remoteArtifactRepositories", required = true, readonly = true)
+ protected List remoteRepos;
+
+ /**
+ * To look up Archiver/UnArchiver implementations
+ *
+ */
+ @Component
+ protected ArchiverManager archiverManager;
+
+ /**
+ * Contains the full list of projects in the reactor.
+ *
+ */
+ @Parameter(property = "reactorProjects", required = true, readonly = true)
+ protected List reactorProjects;
+
+ /**
+ * If the plugin should be silent.
+ *
+ */
+ @Parameter(property = "silent", required = false, defaultValue = "false")
+ public boolean silent;
+
+ /**
+ * Output absolute filename for resolved artifacts
+ *
+ */
+ @Parameter(property = "outputAbsoluteArtifactFilename", defaultValue = "false", required = false)
+ protected boolean outputAbsoluteArtifactFilename;
+
+ @Override
+ public void execute() throws MojoExecutionException, MojoFailureException {
+ copyDependencies();
+ makeNar();
+ }
+
+ private void copyDependencies() throws MojoExecutionException {
+ DependencyStatusSets dss = getDependencySets(this.failOnMissingClassifierArtifact);
+ Set artifacts = dss.getResolvedDependencies();
+
+ for (Object artifactObj : artifacts) {
+ copyArtifact((Artifact) artifactObj);
+ }
+
+ artifacts = dss.getSkippedDependencies();
+ for (Object artifactOjb : artifacts) {
+ Artifact artifact = (Artifact) artifactOjb;
+ getLog().info(artifact.getFile().getName() + " already exists in destination.");
+ }
+ }
+
+ protected void copyArtifact(Artifact artifact) throws MojoExecutionException {
+ String destFileName = DependencyUtil.getFormattedFileName(artifact, false);
+ final File destDir = DependencyUtil.getFormattedOutputDirectory(false, false, false, false, false, getDependenciesDirectory(), artifact);
+ final File destFile = new File(destDir, destFileName);
+ copyFile(artifact.getFile(), destFile);
+ }
+
+ protected Artifact getResolvedPomArtifact(Artifact artifact) {
+ Artifact pomArtifact = this.factory.createArtifact(artifact.getGroupId(), artifact.getArtifactId(), artifact.getVersion(), "", "pom");
+ // Resolve the pom artifact using repos
+ try {
+ this.resolver.resolve(pomArtifact, this.remoteRepos, this.local);
+ } catch (ArtifactResolutionException | ArtifactNotFoundException e) {
+ getLog().info(e.getMessage());
+ }
+ return pomArtifact;
+ }
+
+ protected ArtifactsFilter getMarkedArtifactFilter() {
+ return new DestFileFilter(this.overWriteReleases, this.overWriteSnapshots, this.overWriteIfNewer, false, false, false, false, false, getDependenciesDirectory());
+ }
+
+ protected DependencyStatusSets getDependencySets(boolean stopOnFailure) throws MojoExecutionException {
+ // add filters in well known order, least specific to most specific
+ FilterArtifacts filter = new FilterArtifacts();
+
+ filter.addFilter(new ProjectTransitivityFilter(project.getDependencyArtifacts(), false));
+ filter.addFilter(new ScopeFilter(this.includeScope, this.excludeScope));
+ filter.addFilter(new TypeFilter(this.includeTypes, this.excludeTypes));
+ filter.addFilter(new ClassifierFilter(this.includeClassifiers, this.excludeClassifiers));
+ filter.addFilter(new GroupIdFilter(this.includeGroupIds, this.excludeGroupIds));
+ filter.addFilter(new ArtifactIdFilter(this.includeArtifactIds, this.excludeArtifactIds));
+
+ // explicitly filter our nar dependencies
+ filter.addFilter(new TypeFilter("", "nar"));
+
+ // start with all artifacts.
+ Set artifacts = project.getArtifacts();
+
+ // perform filtering
+ try {
+ artifacts = filter.filter(artifacts);
+ } catch (ArtifactFilterException e) {
+ throw new MojoExecutionException(e.getMessage(), e);
+ }
+
+ // transform artifacts if classifier is set
+ final DependencyStatusSets status;
+ if (StringUtils.isNotEmpty(copyDepClassifier)) {
+ status = getClassifierTranslatedDependencies(artifacts, stopOnFailure);
+ } else {
+ status = filterMarkedDependencies(artifacts);
+ }
+
+ return status;
+ }
+
+ protected DependencyStatusSets getClassifierTranslatedDependencies(Set artifacts, boolean stopOnFailure) throws MojoExecutionException {
+ Set unResolvedArtifacts = new HashSet();
+ Set resolvedArtifacts = artifacts;
+ DependencyStatusSets status = new DependencyStatusSets();
+
+ // possibly translate artifacts into a new set of artifacts based on the
+ // classifier and type
+ // if this did something, we need to resolve the new artifacts
+ if (StringUtils.isNotEmpty(copyDepClassifier)) {
+ ArtifactTranslator translator = new ClassifierTypeTranslator(this.copyDepClassifier, this.type, this.factory);
+ artifacts = translator.translate(artifacts, getLog());
+
+ status = filterMarkedDependencies(artifacts);
+
+ // the unskipped artifacts are in the resolved set.
+ artifacts = status.getResolvedDependencies();
+
+ // resolve the rest of the artifacts
+ ArtifactsResolver artifactsResolver = new DefaultArtifactsResolver(this.resolver, this.local,
+ this.remoteRepos, stopOnFailure);
+ resolvedArtifacts = artifactsResolver.resolve(artifacts, getLog());
+
+ // calculate the artifacts not resolved.
+ unResolvedArtifacts.addAll(artifacts);
+ unResolvedArtifacts.removeAll(resolvedArtifacts);
+ }
+
+ // return a bean of all 3 sets.
+ status.setResolvedDependencies(resolvedArtifacts);
+ status.setUnResolvedDependencies(unResolvedArtifacts);
+
+ return status;
+ }
+
+ protected DependencyStatusSets filterMarkedDependencies(Set artifacts) throws MojoExecutionException {
+ // remove files that have markers already
+ FilterArtifacts filter = new FilterArtifacts();
+ filter.clearFilters();
+ filter.addFilter(getMarkedArtifactFilter());
+
+ Set unMarkedArtifacts;
+ try {
+ unMarkedArtifacts = filter.filter(artifacts);
+ } catch (ArtifactFilterException e) {
+ throw new MojoExecutionException(e.getMessage(), e);
+ }
+
+ // calculate the skipped artifacts
+ Set skippedArtifacts = new HashSet();
+ skippedArtifacts.addAll(artifacts);
+ skippedArtifacts.removeAll(unMarkedArtifacts);
+
+ return new DependencyStatusSets(unMarkedArtifacts, null, skippedArtifacts);
+ }
+
+ protected void copyFile(File artifact, File destFile) throws MojoExecutionException {
+ try {
+ getLog().info("Copying " + (this.outputAbsoluteArtifactFilename ? artifact.getAbsolutePath() : artifact.getName()) + " to " + destFile);
+ FileUtils.copyFile(artifact, destFile);
+ } catch (Exception e) {
+ throw new MojoExecutionException("Error copying artifact from " + artifact + " to " + destFile, e);
+ }
+ }
+
+ private File getClassesDirectory() {
+ final File outputDirectory = new File(project.getBasedir(), "target");
+ return new File(outputDirectory, "classes");
+ }
+
+ private File getDependenciesDirectory() {
+ return new File(getClassesDirectory(), "META-INF/dependencies");
+ }
+
+ private void makeNar() throws MojoExecutionException {
+ File narFile = createArchive();
+
+ if (classifier != null) {
+ projectHelper.attachArtifact(project, "nar", classifier, narFile);
+ } else {
+ project.getArtifact().setFile(narFile);
+ }
+ }
+
+ public File createArchive() throws MojoExecutionException {
+ final File outputDirectory = new File(project.getBasedir(), "target");
+ File narFile = getNarFile(outputDirectory, finalName, classifier);
+ MavenArchiver archiver = new MavenArchiver();
+ archiver.setArchiver(jarArchiver);
+ archiver.setOutputFile(narFile);
+ archive.setForced(forceCreation);
+
+ try {
+ File contentDirectory = getClassesDirectory();
+ if (!contentDirectory.exists()) {
+ getLog().warn("NAR will be empty - no content was marked for inclusion!");
+ } else {
+ archiver.getArchiver().addDirectory(contentDirectory, getIncludes(), getExcludes());
+ }
+
+ File existingManifest = defaultManifestFile;
+ if (useDefaultManifestFile && existingManifest.exists() && archive.getManifestFile() == null) {
+ getLog().info("Adding existing MANIFEST to archive. Found under: " + existingManifest.getPath());
+ archive.setManifestFile(existingManifest);
+ }
+
+ // automatically add the artifact id to the manifest
+ archive.addManifestEntry("Nar-Id", project.getArtifactId());
+
+ // look for a nar dependency
+ String narDependency = getNarDependency();
+ if (narDependency != null) {
+ archive.addManifestEntry("Nar-Dependency-Id", narDependency);
+ }
+
+ archiver.createArchive(session, project, archive);
+ return narFile;
+ } catch (ArchiverException | MojoExecutionException | ManifestException | IOException | DependencyResolutionRequiredException e) {
+ throw new MojoExecutionException("Error assembling NAR", e);
+ }
+ }
+
+ private String[] getIncludes() {
+ if (includes != null && includes.length > 0) {
+ return includes;
+ }
+ return DEFAULT_INCLUDES;
+ }
+
+ private String[] getExcludes() {
+ if (excludes != null && excludes.length > 0) {
+ return excludes;
+ }
+ return DEFAULT_EXCLUDES;
+ }
+
+ protected File getNarFile(File basedir, String finalName, String classifier) {
+ if (classifier == null) {
+ classifier = "";
+ } else if (classifier.trim().length() > 0 && !classifier.startsWith("-")) {
+ classifier = "-" + classifier;
+ }
+
+ return new File(basedir, finalName + classifier + ".nar");
+ }
+
+ private String getNarDependency() throws MojoExecutionException {
+ String narDependency = null;
+
+ // get nar dependencies
+ FilterArtifacts filter = new FilterArtifacts();
+ filter.addFilter(new TypeFilter("nar", ""));
+
+ // start with all artifacts.
+ Set artifacts = project.getArtifacts();
+
+ // perform filtering
+ try {
+ artifacts = filter.filter(artifacts);
+ } catch (ArtifactFilterException e) {
+ throw new MojoExecutionException(e.getMessage(), e);
+ }
+
+ // ensure there is a single nar dependency
+ if (artifacts.size() > 1) {
+ throw new MojoExecutionException("Each NAR represents a ClassLoader. A NAR dependency allows that NAR's ClassLoader to be "
+ + "used as the parent of this NAR's ClassLoader. As a result, only a single NAR dependency is allowed.");
+ } else if (artifacts.size() == 1) {
+ final Artifact artifact = (Artifact) artifacts.iterator().next();
+ narDependency = artifact.getArtifactId();
+ }
+
+ return narDependency;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
----------------------------------------------------------------------
diff --git a/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml b/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
new file mode 100644
index 0000000..0680d18
--- /dev/null
+++ b/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<component-set>
+ <components>
+ <component>
+ <role>org.apache.maven.lifecycle.mapping.LifecycleMapping</role>
+ <role-hint>nar</role-hint>
+ <implementation>org.apache.maven.lifecycle.mapping.DefaultLifecycleMapping</implementation>
+ <configuration>
+ <lifecycles>
+ <lifecycle>
+ <id>default</id>
+ <phases>
+ <process-resources>org.apache.maven.plugins:maven-resources-plugin:resources</process-resources>
+ <compile>org.apache.maven.plugins:maven-compiler-plugin:compile</compile>
+ <process-test-resources>org.apache.maven.plugins:maven-resources-plugin:testResources</process-test-resources>
+ <test-compile>org.apache.maven.plugins:maven-compiler-plugin:testCompile</test-compile>
+ <test>org.apache.maven.plugins:maven-surefire-plugin:test</test>
+ <package>org.apache.nifi:nar-maven-plugin:nar</package>
+ <install>org.apache.maven.plugins:maven-install-plugin:install</install>
+ <deploy>org.apache.maven.plugins:maven-deploy-plugin:deploy</deploy>
+ </phases>
+ </lifecycle>
+ </lifecycles>
+ </configuration>
+ </component>
+ <component>
+ <role>org.apache.maven.artifact.handler.ArtifactHandler</role>
+ <role-hint>nar</role-hint>
+ <implementation>org.apache.maven.artifact.handler.DefaultArtifactHandler</implementation>
+ <configuration>
+ <type>nar</type>
+ <language>java</language>
+ <addedToClasspath>false</addedToClasspath>
+ <includesDependencies>true</includesDependencies>
+ </configuration>
+ </component>
+ </components>
+</component-set>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
new file mode 100644
index 0000000..6280349
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
@@ -0,0 +1,67 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-services-bundle</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>distributed-cache-client-service</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Distributed Cache Client Service</name>
+ <description>Provides a Client for interfacing with a Distributed Cache</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-client-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>distributed-cache-protocol</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>remote-communications-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-processor-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-stream-utils</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>ssl-context-service-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.9</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
new file mode 100644
index 0000000..f838c2f
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+public interface CommsSession extends Closeable {
+
+ void setTimeout(final long value, final TimeUnit timeUnit);
+
+ InputStream getInputStream() throws IOException;
+
+ OutputStream getOutputStream() throws IOException;
+
+ boolean isClosed();
+
+ void interrupt();
+
+ String getHostname();
+
+ int getPort();
+
+ long getTimeout(TimeUnit timeUnit);
+
+ SSLContext getSSLContext();
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
new file mode 100644
index 0000000..ee96660
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+
+ public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+ .name("Server Hostname")
+ .description("The name of the server that is running the DistributedMapCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+ .name("Server Port")
+ .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .defaultValue("4557")
+ .build();
+ public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("SSL Context Service")
+ .description(
+ "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+ .required(false)
+ .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+ .defaultValue(null)
+ .build();
+ public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
+ .name("Communications Timeout")
+ .description(
+ "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .defaultValue("30 secs")
+ .build();
+
+ private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
+ private volatile ConfigurationContext configContext;
+ private volatile boolean closed = false;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(HOSTNAME);
+ descriptors.add(PORT);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ descriptors.add(COMMUNICATIONS_TIMEOUT);
+ return descriptors;
+ }
+
+ @OnConfigured
+ public void cacheConfig(final ConfigurationContext context) {
+ this.configContext = context;
+ }
+
+ @Override
+ public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
+ throws IOException {
+ return withCommsSession(new CommsAction<Boolean>() {
+ @Override
+ public Boolean execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("putIfAbsent");
+
+ serialize(key, keySerializer, dos);
+ serialize(value, valueSerializer, dos);
+
+ dos.flush();
+
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ }
+ });
+ }
+
+ @Override
+ public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+ return withCommsSession(new CommsAction<Boolean>() {
+ @Override
+ public Boolean execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("containsKey");
+
+ serialize(key, keySerializer, dos);
+ dos.flush();
+
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ }
+ });
+ }
+
+ @Override
+ public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+ final Deserializer<V> valueDeserializer) throws IOException {
+ return withCommsSession(new CommsAction<V>() {
+ @Override
+ public V execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("getAndPutIfAbsent");
+
+ serialize(key, keySerializer, dos);
+ serialize(value, valueSerializer, dos);
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+ return valueDeserializer.deserialize(responseBuffer);
+ }
+ });
+ }
+
+ @Override
+ public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+ return withCommsSession(new CommsAction<V>() {
+ @Override
+ public V execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("get");
+
+ serialize(key, keySerializer, dos);
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+ return valueDeserializer.deserialize(responseBuffer);
+ }
+ });
+ }
+
+ @Override
+ public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+ return withCommsSession(new CommsAction<Boolean>() {
+ @Override
+ public Boolean execute(final CommsSession session) throws IOException {
+ final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+ dos.writeUTF("remove");
+
+ serialize(key, serializer, dos);
+ dos.flush();
+
+ // read response
+ final DataInputStream dis = new DataInputStream(session.getInputStream());
+ return dis.readBoolean();
+ }
+ });
+ }
+
+ private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
+ final int responseLength = dis.readInt();
+ final byte[] responseBuffer = new byte[responseLength];
+ dis.readFully(responseBuffer);
+ return responseBuffer;
+ }
+
+ public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
+ final String hostname = context.getProperty(HOSTNAME).getValue();
+ final int port = context.getProperty(PORT).asInteger();
+ final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+ final CommsSession commsSession;
+ if (sslContextService == null) {
+ commsSession = new StandardCommsSession(hostname, port);
+ } else {
+ commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+ }
+
+ commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+ return commsSession;
+ }
+
+ private CommsSession leaseCommsSession() throws IOException {
+ CommsSession session = queue.poll();
+ if (session != null && !session.isClosed()) {
+ return session;
+ }
+
+ session = createCommsSession(configContext);
+ final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+ try {
+ ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+ } catch (final HandshakeException e) {
+ try {
+ session.close();
+ } catch (final IOException ioe) {
+ }
+
+ throw new IOException(e);
+ }
+
+ return session;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.closed = true;
+
+ CommsSession commsSession;
+ while ((commsSession = queue.poll()) != null) {
+ try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
+ dos.writeUTF("close");
+ dos.flush();
+ commsSession.close();
+ } catch (final IOException e) {
+ }
+ }
+ logger.info("Closed {}", new Object[] { getIdentifier() });
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ if (!closed)
+ close();
+ logger.debug("Finalize called");
+ }
+
+ private <T> void serialize(final T value, final Serializer<T> serializer, final DataOutputStream dos) throws IOException {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(value, baos);
+ dos.writeInt(baos.size());
+ baos.writeTo(dos);
+ }
+
+ private <T> T withCommsSession(final CommsAction<T> action) throws IOException {
+ if (closed) {
+ throw new IllegalStateException("Client is closed");
+ }
+
+ final CommsSession session = leaseCommsSession();
+ try {
+ return action.execute(session);
+ } catch (final IOException ioe) {
+ try {
+ session.close();
+ } catch (final IOException ignored) {
+ }
+
+ throw ioe;
+ } finally {
+ if (!session.isClosed()) {
+ if (this.closed) {
+ try {
+ session.close();
+ } catch (final IOException ioe) {
+ }
+ } else {
+ queue.offer(session);
+ }
+ }
+ }
+ }
+
+ private static interface CommsAction<T> {
+ T execute(CommsSession commsSession) throws IOException;
+ }
+
+}