You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:58 UTC

[35/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java b/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java
new file mode 100644
index 0000000..55f59ba
--- /dev/null
+++ b/extensions/file-authorization-provider/src/main/java/org/apache/nifi/authorization/FileAuthorizationProvider.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Set;
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import org.apache.nifi.authorization.annotation.AuthorityProviderContext;
+import org.apache.nifi.authorization.exception.AuthorityAccessException;
+import org.apache.nifi.authorization.exception.IdentityAlreadyExistsException;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.authorization.exception.UnknownIdentityException;
+import org.apache.nifi.file.FileUtils;
+import org.apache.nifi.user.generated.ObjectFactory;
+import org.apache.nifi.user.generated.Role;
+import org.apache.nifi.user.generated.User;
+import org.apache.nifi.user.generated.Users;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+/**
+ * Provides identity checks and grants authorities.
+ */
+public class FileAuthorizationProvider implements AuthorityProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(FileAuthorizationProvider.class);
+    private static final String USERS_XSD = "/users.xsd";
+    private static final String JAXB_GENERATED_PATH = "org.apache.nifi.user.generated";
+    private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+    /**
+     * Load the JAXBContext.
+     */
+    private static JAXBContext initializeJaxbContext() {
+        try {
+            return JAXBContext.newInstance(JAXB_GENERATED_PATH, FileAuthorizationProvider.class.getClassLoader());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.");
+        }
+    }
+
+    private NiFiProperties properties;
+    private File usersFile;
+    private File restoreUsersFile;
+    private Users users;
+    private final Set<String> defaultAuthorities = new HashSet<>();
+
+    @Override
+    public void initialize(final AuthorityProviderInitializationContext initializationContext) throws ProviderCreationException {
+    }
+
+    @Override
+    public void onConfigured(final AuthorityProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        try {
+            final String usersFilePath = configurationContext.getProperty("Authorized Users File");
+            if (usersFilePath == null || usersFilePath.trim().isEmpty()) {
+                throw new ProviderCreationException("The authorized users file must be specified.");
+            }
+
+            // the users file instance will never be null because a default is used
+            usersFile = new File(usersFilePath);
+            final File usersFileDirectory = usersFile.getParentFile();
+
+            // the restore directory is optional and may be null
+            final File restoreDirectory = properties.getRestoreDirectory();
+
+            if (restoreDirectory != null) {
+
+                // sanity check that restore directory is a directory, creating it if necessary
+                FileUtils.ensureDirectoryExistAndCanAccess(restoreDirectory);
+
+                // check that restore directory is not the same as the primary directory
+                if (usersFileDirectory.getAbsolutePath().equals(restoreDirectory.getAbsolutePath())) {
+                    throw new ProviderCreationException(String.format("Authorized User's directory '%s' is the same as restore directory '%s' ",
+                            usersFileDirectory.getAbsolutePath(), restoreDirectory.getAbsolutePath()));
+                }
+
+                // the restore copy will have same file name, but reside in a different directory
+                restoreUsersFile = new File(restoreDirectory, usersFile.getName());
+
+                // sync the primary copy with the restore copy
+                try {
+                    FileUtils.syncWithRestore(usersFile, restoreUsersFile, logger);
+                } catch (final IOException | IllegalStateException ioe) {
+                    throw new ProviderCreationException(ioe);
+                }
+
+            }
+
+            // load the users from the specified file
+            if (usersFile.exists()) {
+                // find the schema
+                final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+                final Schema schema = schemaFactory.newSchema(FileAuthorizationProvider.class.getResource(USERS_XSD));
+
+                // attempt to unmarshal
+                final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+                unmarshaller.setSchema(schema);
+                final JAXBElement<Users> element = unmarshaller.unmarshal(new StreamSource(usersFile), Users.class);
+                users = element.getValue();
+            } else {
+                final ObjectFactory objFactory = new ObjectFactory();
+                users = objFactory.createUsers();
+            }
+
+            // attempt to load a default roles
+            final String rawDefaultAuthorities = configurationContext.getProperty("Default User Roles");
+            if (StringUtils.isNotBlank(rawDefaultAuthorities)) {
+                final Set<String> invalidDefaultAuthorities = new HashSet<>();
+
+                // validate the specified authorities
+                final String[] rawDefaultAuthorityList = rawDefaultAuthorities.split(",");
+                for (String rawAuthority : rawDefaultAuthorityList) {
+                    rawAuthority = rawAuthority.trim();
+                    final Authority authority = Authority.valueOfAuthority(rawAuthority);
+                    if (authority == null) {
+                        invalidDefaultAuthorities.add(rawAuthority);
+                    } else {
+                        defaultAuthorities.add(rawAuthority);
+                    }
+                }
+
+                // report any unrecognized authorities
+                if (!invalidDefaultAuthorities.isEmpty()) {
+                    logger.warn(String.format("The following default role(s) '%s' were not recognized. Possible values: %s.",
+                            StringUtils.join(invalidDefaultAuthorities, ", "), StringUtils.join(Authority.getRawAuthorities(), ", ")));
+                }
+            }
+        } catch (IOException | ProviderCreationException | SAXException | JAXBException e) {
+            throw new ProviderCreationException(e);
+        }
+
+    }
+
+    @Override
+    public void preDestruction() {
+    }
+
+    /**
+     * Determines if this provider has a default role.
+     *
+     * @return
+     */
+    private boolean hasDefaultRoles() {
+        return !defaultAuthorities.isEmpty();
+    }
+
+    /**
+     * Determines if the specified dn is known to this authority provider. When
+     * this provider is configured to have default role(s), all dn are
+     * considered to exist.
+     *
+     * @param dn
+     * @return True if he dn is known, false otherwise
+     */
+    @Override
+    public boolean doesDnExist(String dn) throws AuthorityAccessException {
+        if (hasDefaultRoles()) {
+            return true;
+        }
+
+        final User user = getUser(dn);
+        return user != null;
+    }
+
+    /**
+     * Loads the authorities for the specified user. If this provider is
+     * configured for default user role(s) and a non existent dn is specified, a
+     * new user will be automatically created with the default role(s).
+     *
+     * @param dn
+     * @return
+     * @throws UnknownIdentityException
+     * @throws AuthorityAccessException
+     */
+    @Override
+    public synchronized Set<Authority> getAuthorities(String dn) throws UnknownIdentityException, AuthorityAccessException {
+        final Set<Authority> authorities = EnumSet.noneOf(Authority.class);
+
+        // get the user 
+        final User user = getUser(dn);
+
+        // ensure the user was located
+        if (user == null) {
+            if (hasDefaultRoles()) {
+                logger.debug(String.format("User DN not found: %s. Creating new user with default roles.", dn));
+
+                // create the user (which will automatically add any default authorities)
+                addUser(dn, null);
+
+                // get the authorities for the newly created user
+                authorities.addAll(getAuthorities(dn));
+            } else {
+                throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+            }
+        } else {
+            // create the authorities that this user has
+            for (final Role role : user.getRole()) {
+                authorities.add(Authority.valueOfAuthority(role.getName()));
+            }
+        }
+
+        return authorities;
+    }
+
+    /**
+     * Adds the specified authorities to the specified user. Regardless of
+     * whether this provider is configured for a default user role, when a non
+     * existent dn is specified, an UnknownIdentityException will be thrown.
+     *
+     * @param dn
+     * @param authorities
+     * @throws UnknownIdentityException
+     * @throws AuthorityAccessException
+     */
+    @Override
+    public synchronized void setAuthorities(String dn, Set<Authority> authorities) throws UnknownIdentityException, AuthorityAccessException {
+        // get the user
+        final User user = getUser(dn);
+
+        // ensure the user was located
+        if (user == null) {
+            throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+        }
+
+        // add the user authorities
+        setUserAuthorities(user, authorities);
+
+        try {
+            // save the file
+            save();
+        } catch (Exception e) {
+            throw new AuthorityAccessException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Adds the specified authorities to the specified user.
+     *
+     * @param user
+     * @param authorities
+     */
+    private void setUserAuthorities(final User user, final Set<Authority> authorities) {
+        // clear the existing rules
+        user.getRole().clear();
+
+        // set the new roles
+        final ObjectFactory objFactory = new ObjectFactory();
+        for (final Authority authority : authorities) {
+            final Role role = objFactory.createRole();
+            role.setName(authority.toString());
+
+            // add the new role
+            user.getRole().add(role);
+        }
+    }
+
+    /**
+     * Adds the specified user. If this provider is configured with default
+     * role(s) they will be added to the new user.
+     *
+     * @param dn
+     * @param group
+     * @throws UnknownIdentityException
+     * @throws AuthorityAccessException
+     */
+    @Override
+    public synchronized void addUser(String dn, String group) throws IdentityAlreadyExistsException, AuthorityAccessException {
+        final User user = getUser(dn);
+
+        // ensure the user doesn't already exist
+        if (user != null) {
+            throw new IdentityAlreadyExistsException(String.format("User DN already exists: %s", dn));
+        }
+
+        // create the new user
+        final ObjectFactory objFactory = new ObjectFactory();
+        final User newUser = objFactory.createUser();
+
+        // set the user properties
+        newUser.setDn(dn);
+        newUser.setGroup(group);
+
+        // add default roles if appropriate
+        if (hasDefaultRoles()) {
+            for (final String authority : defaultAuthorities) {
+                Role role = objFactory.createRole();
+                role.setName(authority);
+
+                // add the role
+                newUser.getRole().add(role);
+            }
+        }
+
+        // add the user
+        users.getUser().add(newUser);
+
+        try {
+            // save the file
+            save();
+        } catch (Exception e) {
+            throw new AuthorityAccessException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Gets the users for the specified authority.
+     *
+     * @param authority
+     * @return
+     * @throws AuthorityAccessException
+     */
+    @Override
+    public synchronized Set<String> getUsers(Authority authority) throws AuthorityAccessException {
+        final Set<String> userSet = new HashSet<>();
+        for (final User user : users.getUser()) {
+            for (final Role role : user.getRole()) {
+                if (role.getName().equals(authority.toString())) {
+                    userSet.add(user.getDn());
+                }
+            }
+        }
+        return userSet;
+    }
+
+    /**
+     * Removes the specified user. Regardless of whether this provider is
+     * configured for a default user role, when a non existent dn is specified,
+     * an UnknownIdentityException will be thrown.
+     *
+     * @param dn
+     * @throws UnknownIdentityException
+     * @throws AuthorityAccessException
+     */
+    @Override
+    public synchronized void revokeUser(String dn) throws UnknownIdentityException, AuthorityAccessException {
+        // get the user
+        final User user = getUser(dn);
+
+        // ensure the user was located
+        if (user == null) {
+            throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+        }
+
+        // remove the specified user
+        users.getUser().remove(user);
+
+        try {
+            // save the file
+            save();
+        } catch (Exception e) {
+            throw new AuthorityAccessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void setUsersGroup(Set<String> dns, String group) throws UnknownIdentityException, AuthorityAccessException {
+        final Collection<User> groupedUsers = new HashSet<>();
+
+        // get the specified users
+        for (final String dn : dns) {
+            // get the user
+            final User user = getUser(dn);
+
+            // ensure the user was located
+            if (user == null) {
+                throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+            }
+
+            groupedUsers.add(user);
+        }
+
+        // update each user group
+        for (final User user : groupedUsers) {
+            user.setGroup(group);
+        }
+
+        try {
+            // save the file
+            save();
+        } catch (Exception e) {
+            throw new AuthorityAccessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void ungroupUser(String dn) throws UnknownIdentityException, AuthorityAccessException {
+        // get the user
+        final User user = getUser(dn);
+
+        // ensure the user was located
+        if (user == null) {
+            throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+        }
+
+        // remove the users group
+        user.setGroup(null);
+
+        try {
+            // save the file
+            save();
+        } catch (Exception e) {
+            throw new AuthorityAccessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void ungroup(String group) throws AuthorityAccessException {
+        // get the user group
+        final Collection<User> userGroup = getUserGroup(group);
+
+        // ensure the user group was located
+        if (userGroup == null) {
+            return;
+        }
+
+        // update each user group
+        for (final User user : userGroup) {
+            user.setGroup(null);
+        }
+
+        try {
+            // save the file
+            save();
+        } catch (Exception e) {
+            throw new AuthorityAccessException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public String getGroupForUser(String dn) throws UnknownIdentityException, AuthorityAccessException {
+        // get the user
+        final User user = getUser(dn);
+
+        // ensure the user was located
+        if (user == null) {
+            throw new UnknownIdentityException(String.format("User DN not found: %s.", dn));
+        }
+
+        return user.getGroup();
+    }
+
+    @Override
+    public void revokeGroup(String group) throws UnknownIdentityException, AuthorityAccessException {
+        // get the user group
+        final Collection<User> userGroup = getUserGroup(group);
+
+        // ensure the user group was located
+        if (userGroup == null) {
+            throw new UnknownIdentityException(String.format("User group not found: %s.", group));
+        }
+
+        // remove each user in the group
+        for (final User user : userGroup) {
+            users.getUser().remove(user);
+        }
+
+        try {
+            // save the file
+            save();
+        } catch (Exception e) {
+            throw new AuthorityAccessException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Locates the user with the specified DN.
+     *
+     * @param dn
+     * @return
+     */
+    private User getUser(String dn) throws UnknownIdentityException {
+        // ensure the DN was specified
+        if (dn == null) {
+            throw new UnknownIdentityException("User DN not specified.");
+        }
+
+        // attempt to get the user and ensure it was located
+        User desiredUser = null;
+        for (final User user : users.getUser()) {
+            if (dn.equalsIgnoreCase(user.getDn())) {
+                desiredUser = user;
+                break;
+            }
+        }
+
+        return desiredUser;
+    }
+
+    /**
+     * Locates all users that are part of the specified group.
+     *
+     * @param group
+     * @return
+     * @throws UnknownIdentityException
+     */
+    private Collection<User> getUserGroup(String group) throws UnknownIdentityException {
+        // ensure the DN was specified
+        if (group == null) {
+            throw new UnknownIdentityException("User group not specified.");
+        }
+
+        // get all users with this group
+        Collection<User> userGroup = null;
+        for (final User user : users.getUser()) {
+            if (group.equals(user.getGroup())) {
+                if (userGroup == null) {
+                    userGroup = new HashSet<>();
+                }
+                userGroup.add(user);
+            }
+        }
+
+        return userGroup;
+    }
+
+    /**
+     * Saves the users file.
+     *
+     * @throws Exception
+     */
+    private void save() throws Exception {
+        final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+        marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+
+        // save users to restore directory before primary directory
+        if (restoreUsersFile != null) {
+            marshaller.marshal(users, restoreUsersFile);
+        }
+
+        // save users to primary directory
+        marshaller.marshal(users, usersFile);
+    }
+
+    @AuthorityProviderContext
+    public void setNiFiProperties(NiFiProperties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider b/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
new file mode 100755
index 0000000..93d2941
--- /dev/null
+++ b/extensions/file-authorization-provider/src/main/resources/META-INF/services/org.apache.nifi.authorization.AuthorityProvider
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.authorization.FileAuthorizationProvider

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/main/xsd/users.xsd
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/src/main/xsd/users.xsd b/extensions/file-authorization-provider/src/main/xsd/users.xsd
new file mode 100644
index 0000000..4ee1e17
--- /dev/null
+++ b/extensions/file-authorization-provider/src/main/xsd/users.xsd
@@ -0,0 +1,64 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
+    <!-- role -->
+    <xs:complexType name="Role">
+        <xs:attribute name="name">
+            <xs:simpleType>
+                <xs:restriction base="xs:string">
+                    <xs:enumeration value="ROLE_MONITOR"/>
+                    <xs:enumeration value="ROLE_PROVENANCE"/>
+                    <xs:enumeration value="ROLE_DFM"/>
+                    <xs:enumeration value="ROLE_ADMIN"/>
+                    <xs:enumeration value="ROLE_PROXY"/>
+                    <xs:enumeration value="ROLE_NIFI"/>
+                </xs:restriction>
+            </xs:simpleType>
+        </xs:attribute>
+    </xs:complexType>
+
+    <!-- user -->
+    <xs:complexType name="User">
+        <xs:sequence>
+            <xs:element name="role" type="Role" minOccurs="0" maxOccurs="unbounded"/>
+        </xs:sequence>
+        <xs:attribute name="dn">
+            <xs:simpleType>
+                <xs:restriction base="xs:string">
+                    <xs:minLength value="1"/>
+                    <xs:pattern value=".*[^\s].*"/>
+                </xs:restriction>
+            </xs:simpleType>
+        </xs:attribute>
+        <xs:attribute name="group">
+            <xs:simpleType>
+                <xs:restriction base="xs:string">
+                    <xs:minLength value="1"/>
+                    <xs:pattern value=".*[^\s].*"/>
+                </xs:restriction>
+            </xs:simpleType>
+        </xs:attribute>
+    </xs:complexType>
+
+    <!-- users -->
+    <xs:element name="users">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element name="user" type="User" minOccurs="0" maxOccurs="unbounded"/>
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+</xs:schema>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java
----------------------------------------------------------------------
diff --git a/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java b/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java
new file mode 100644
index 0000000..3d0196d
--- /dev/null
+++ b/extensions/file-authorization-provider/src/test/java/org/apache/nifi/authorization/FileAuthorizationProviderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.file.FileUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+import org.junit.Ignore;
+import org.mockito.Mockito;
+
+@Ignore
+public class FileAuthorizationProviderTest {
+    
+    private FileAuthorizationProvider provider;
+    
+    private File primary;
+    
+    private File restore;
+    
+    private NiFiProperties mockProperties;
+    
+    private AuthorityProviderConfigurationContext mockConfigurationContext;
+    
+    @Before
+    public void setup() throws IOException {
+        
+        primary = new File("target/primary/users.txt");
+        restore = new File("target/restore/users.txt");
+        
+        System.out.println("absolute path: " + primary.getAbsolutePath());
+        
+        mockProperties = mock(NiFiProperties.class);
+        when(mockProperties.getRestoreDirectory()).thenReturn(restore.getParentFile());
+        
+        mockConfigurationContext = mock(AuthorityProviderConfigurationContext.class);
+        when(mockConfigurationContext.getProperty(Mockito.eq("Authorized Users File"))).thenReturn(primary.getPath());
+        
+        provider = new FileAuthorizationProvider();
+        provider.setNiFiProperties(mockProperties);
+        provider.initialize(null);
+    }     
+    
+    @After
+    public void cleanup() throws Exception {
+        deleteFile(primary);
+        deleteFile(restore);
+    }
+    
+    private boolean deleteFile(final File file) {
+        if(file.isDirectory()) {
+            FileUtils.deleteFilesInDir(file, null, null, true, true);
+        }
+        return FileUtils.deleteFile(file, null, 10);
+    }
+    
+    @Test
+    public void testPostContructionWhenRestoreDoesNotExist() throws Exception {
+        
+        byte[] primaryBytes = "<users/>".getBytes();
+        FileOutputStream fos = new FileOutputStream(primary);
+        fos.write(primaryBytes);
+        fos.close();
+        
+        provider.onConfigured(mockConfigurationContext);
+        assertEquals(primary.length(), restore.length());
+    }
+    
+    @Test
+    public void testPostContructionWhenPrimaryDoesNotExist() throws Exception {
+        
+        byte[] restoreBytes = "<users/>".getBytes();
+        FileOutputStream fos = new FileOutputStream(restore);
+        fos.write(restoreBytes);
+        fos.close();
+        
+        provider.onConfigured(mockConfigurationContext);
+        assertEquals(restore.length(), primary.length());
+        
+    }
+    
+    @Test(expected = ProviderCreationException.class)
+    public void testPostContructionWhenPrimaryDifferentThanRestore() throws Exception {
+        
+        byte[] primaryBytes = "<users></users>".getBytes();
+        FileOutputStream fos = new FileOutputStream(primary);
+        fos.write(primaryBytes);
+        fos.close();
+        
+        byte[] restoreBytes = "<users/>".getBytes();
+        fos = new FileOutputStream(restore);
+        fos.write(restoreBytes);
+        fos.close();
+        
+        provider.onConfigured(mockConfigurationContext);
+    }
+    
+    @Test
+    public void testPostContructionWhenPrimaryAndBackupDoNotExist() throws Exception {
+        
+        provider.onConfigured(mockConfigurationContext);
+        assertEquals(0, restore.length());
+        assertEquals(restore.length(), primary.length());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/build-order.sh
----------------------------------------------------------------------
diff --git a/misc/build-order.sh b/misc/build-order.sh
new file mode 100755
index 0000000..855321a
--- /dev/null
+++ b/misc/build-order.sh
@@ -0,0 +1,79 @@
+#MAVEN_FLAGS="-Dmaven.test.skip=true"
+MAVEN_FLAGS=""
+
+cd misc/nar-maven-plugin && \
+mvn $MAVEN_FLAGS install && \
+cd ../../commons/nifi-parent && \
+mvn $MAVEN_FLAGS install && \
+cd ../../nifi-api && \
+mvn $MAVEN_FLAGS install && \
+cd ../commons/ && \
+cd	nifi-stream-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../wali && \
+mvn $MAVEN_FLAGS install && \
+cd	../flowfile-packager && \
+mvn $MAVEN_FLAGS install && \
+cd	../core-flowfile-attributes && \
+mvn $MAVEN_FLAGS install && \
+cd	../data-provenance-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../naive-search-ring-buffer && \
+mvn $MAVEN_FLAGS install && \
+cd	../nifi-expression-language && \
+mvn $MAVEN_FLAGS install && \
+cd	../nifi-file-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../nifi-logging-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../nifi-properties && \
+mvn $MAVEN_FLAGS install && \
+cd	../nifi-security-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../nifi-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../nifi-socket-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../nifi-web-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../processor-utilities && \
+mvn $MAVEN_FLAGS install && \
+cd	../remote-communications-utils && \
+mvn $MAVEN_FLAGS install && \
+cd	../search-utils && \
+mvn $MAVEN_FLAGS install && \
+cd ../../extensions/file-authorization-provider && \
+mvn $MAVEN_FLAGS install && \
+cd ../../nifi-mock && \
+mvn $MAVEN_FLAGS install && \
+cd ../nar-bundles/ && \
+cd	nar-container-common && \
+mvn $MAVEN_FLAGS install && \
+cd	../jetty-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../standard-services-api-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../ssl-context-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../distributed-cache-services-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../standard-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../hadoop-libraries-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../hadoop-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../volatile-provenance-repository-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../persistent-provenance-repository-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../framework-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../execute-script-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../monitor-threshold-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd	../update-attribute-bundle && \
+mvn $MAVEN_FLAGS install && \
+cd ../../assemblies/nifi 
+mvn assembly:assembly

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/misc/nar-maven-plugin/pom.xml b/misc/nar-maven-plugin/pom.xml
new file mode 100644
index 0000000..3888df3
--- /dev/null
+++ b/misc/nar-maven-plugin/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nar-maven-plugin</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>maven-plugin</packaging>
+    <name>Apache NiFi NAR Plugin</name>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <build>
+        <defaultGoal>install</defaultGoal>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.2</version>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.5</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-plugin-plugin</artifactId>
+                <version>3.3</version>
+            </plugin>
+        </plugins>
+    </build>    
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.maven</groupId>
+            <artifactId>maven-plugin-api</artifactId>
+            <version>2.0.11</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-dependency-plugin</artifactId>
+            <version>2.9</version>
+            <type>maven-plugin</type>
+        </dependency>
+        <dependency>
+            <!-- No code from maven-jar-plugin is actually used; it's included
+            just to simplify the dependencies list.                     -->
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.maven.plugin-tools</groupId>
+            <artifactId>maven-plugin-annotations</artifactId>
+            <version>3.3</version>
+        </dependency>        
+    </dependencies>
+    <distributionManagement>
+        <repository>
+            <id>nifi-releases</id>
+            <url>${nifi.repo.url}</url>
+        </repository>
+        <snapshotRepository>
+            <id>nifi-snapshots</id>
+            <url>${nifi.snapshot.repo.url}</url>
+        </snapshotRepository>
+    </distributionManagement>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
----------------------------------------------------------------------
diff --git a/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
new file mode 100644
index 0000000..263fe88
--- /dev/null
+++ b/misc/nar-maven-plugin/src/main/java/nifi/NarMojo.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nifi;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.maven.archiver.MavenArchiveConfiguration;
+import org.apache.maven.archiver.MavenArchiver;
+import org.apache.maven.artifact.Artifact;
+import org.apache.maven.artifact.DependencyResolutionRequiredException;
+import org.apache.maven.artifact.factory.ArtifactFactory;
+import org.apache.maven.artifact.installer.ArtifactInstaller;
+import org.apache.maven.artifact.metadata.ArtifactMetadataSource;
+import org.apache.maven.artifact.repository.ArtifactRepository;
+import org.apache.maven.artifact.repository.ArtifactRepositoryFactory;
+import org.apache.maven.artifact.resolver.ArtifactCollector;
+import org.apache.maven.artifact.resolver.ArtifactNotFoundException;
+import org.apache.maven.artifact.resolver.ArtifactResolutionException;
+import org.apache.maven.artifact.resolver.ArtifactResolver;
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugin.MojoFailureException;
+import org.apache.maven.plugin.dependency.utils.DependencyStatusSets;
+import org.apache.maven.plugin.dependency.utils.DependencyUtil;
+import org.apache.maven.plugin.dependency.utils.filters.DestFileFilter;
+import org.apache.maven.plugin.dependency.utils.resolvers.ArtifactsResolver;
+import org.apache.maven.plugin.dependency.utils.resolvers.DefaultArtifactsResolver;
+import org.apache.maven.plugin.dependency.utils.translators.ArtifactTranslator;
+import org.apache.maven.plugin.dependency.utils.translators.ClassifierTypeTranslator;
+import org.apache.maven.plugins.annotations.LifecyclePhase;
+import org.apache.maven.plugins.annotations.Mojo;
+import org.apache.maven.plugins.annotations.Parameter;
+import org.apache.maven.plugins.annotations.ResolutionScope;
+import org.apache.maven.project.MavenProject;
+import org.apache.maven.execution.MavenSession;
+import org.apache.maven.plugins.annotations.Component;
+import org.apache.maven.project.MavenProjectHelper;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactFilterException;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactIdFilter;
+import org.apache.maven.shared.artifact.filter.collection.ArtifactsFilter;
+import org.apache.maven.shared.artifact.filter.collection.ClassifierFilter;
+import org.apache.maven.shared.artifact.filter.collection.FilterArtifacts;
+import org.apache.maven.shared.artifact.filter.collection.GroupIdFilter;
+import org.apache.maven.shared.artifact.filter.collection.ScopeFilter;
+import org.apache.maven.shared.artifact.filter.collection.ProjectTransitivityFilter;
+import org.apache.maven.shared.artifact.filter.collection.TypeFilter;
+import org.codehaus.plexus.archiver.ArchiverException;
+import org.codehaus.plexus.archiver.jar.JarArchiver;
+import org.codehaus.plexus.archiver.jar.ManifestException;
+import org.codehaus.plexus.archiver.manager.ArchiverManager;
+import org.codehaus.plexus.util.FileUtils;
+import org.codehaus.plexus.util.StringUtils;
+
+/**
+ * Packages the current project as an Apache NiFi Archive (NAR).
+ *
+ * The following code is derived from maven-dependencies-plugin and
+ * maven-jar-plugin. The functionality of CopyDependenciesMojo and JarMojo was
+ * simplified to the use case of NarMojo.
+ *
+ */
+@Mojo(name = "nar", defaultPhase = LifecyclePhase.PACKAGE, threadSafe = false, requiresDependencyResolution = ResolutionScope.RUNTIME)
+public class NarMojo extends AbstractMojo {
+
+    private static final String[] DEFAULT_EXCLUDES = new String[]{"**/package.html"};
+    private static final String[] DEFAULT_INCLUDES = new String[]{"**/**"};
+
+    /**
+     * POM
+     *
+     */
+    @Parameter(property = "project", readonly = true, required = true)
+    protected MavenProject project;
+
+    @Parameter(property = "session", readonly = true, required = true)
+    protected MavenSession session;
+
+    /**
+     * List of files to include. Specified as fileset patterns.
+     */
+    @Parameter(property = "includes")
+    protected String[] includes;
+    /**
+     * List of files to exclude. Specified as fileset patterns.
+     */
+    @Parameter(property = "excludes")
+    protected String[] excludes;
+    /**
+     * Name of the generated NAR.
+     *
+     */
+    @Parameter(alias = "narName", property = "nar.finalName", defaultValue = "${project.build.finalName}", required = true)
+    protected String finalName;
+
+    /**
+     * The Jar archiver.
+     *
+     * \@\component role="org.codehaus.plexus.archiver.Archiver" roleHint="jar"
+     */
+    @Component(role = org.codehaus.plexus.archiver.Archiver.class, hint = "jar")
+    private JarArchiver jarArchiver;
+    /**
+     * The archive configuration to use.
+     *
+     * See <a
+     * href="http://maven.apache.org/shared/maven-archiver/index.html">the
+     * documentation for Maven Archiver</a>.
+     *
+     */
+    @Parameter(property = "archive")
+    protected final MavenArchiveConfiguration archive = new MavenArchiveConfiguration();
+    /**
+     * Path to the default MANIFEST file to use. It will be used if
+     * <code>useDefaultManifestFile</code> is set to <code>true</code>.
+     *
+     */
+    @Parameter(property = "defaultManifestFiles", defaultValue = "${project.build.outputDirectory}/META-INF/MANIFEST.MF", readonly = true, required = true)
+    protected File defaultManifestFile;
+
+    /**
+     * Set this to <code>true</code> to enable the use of the
+     * <code>defaultManifestFile</code>.
+     *
+     * @since 2.2
+     */
+    @Parameter(property = "nar.useDefaultManifestFile", defaultValue = "false")
+    protected boolean useDefaultManifestFile;
+
+    @Component
+    protected MavenProjectHelper projectHelper;
+
+    /**
+     * Whether creating the archive should be forced.
+     *
+     */
+    @Parameter(property = "nar.forceCreation", defaultValue = "false")
+    protected boolean forceCreation;
+
+    /**
+     * Classifier to add to the artifact generated. If given, the artifact will
+     * be an attachment instead.
+     *
+     */
+    @Parameter(property = "classifier")
+    protected String classifier;
+
+    @Component
+    protected ArtifactInstaller installer;
+
+    @Component
+    protected ArtifactRepositoryFactory repositoryFactory;
+
+    /**
+     * This only applies if the classifier parameter is used.
+     *
+     */
+    @Parameter(property = "mdep.failOnMissingClassifierArtifact", defaultValue = "true", required = false)
+    protected boolean failOnMissingClassifierArtifact = true;
+
+    /**
+     * Comma Separated list of Types to include. Empty String indicates include
+     * everything (default).
+     *
+     */
+    @Parameter(property = "includeTypes", required = false)
+    protected String includeTypes;
+
+    /**
+     * Comma Separated list of Types to exclude. Empty String indicates don't
+     * exclude anything (default).
+     *
+     */
+    @Parameter(property = "excludeTypes", required = false)
+    protected String excludeTypes;
+
+    /**
+     * Scope to include. An Empty string indicates all scopes (default).
+     *
+     */
+    @Parameter(property = "includeScope", required = false)
+    protected String includeScope;
+
+    /**
+     * Scope to exclude. An Empty string indicates no scopes (default).
+     *
+     */
+    @Parameter(property = "excludeScope", required = false)
+    protected String excludeScope;
+
+    /**
+     * Comma Separated list of Classifiers to include. Empty String indicates
+     * include everything (default).
+     *
+     */
+    @Parameter(property = "includeClassifiers", required = false)
+    protected String includeClassifiers;
+
+    /**
+     * Comma Separated list of Classifiers to exclude. Empty String indicates
+     * don't exclude anything (default).
+     *
+     */
+    @Parameter(property = "excludeClassifiers", required = false)
+    protected String excludeClassifiers;
+
+    /**
+     * Specify classifier to look for. Example: sources
+     *
+     */
+    @Parameter(property = "classifier", required = false)
+    protected String copyDepClassifier;
+
+    /**
+     * Specify type to look for when constructing artifact based on classifier.
+     * Example: java-source,jar,war, nar
+     *
+     */
+    @Parameter(property = "type", required = false, defaultValue = "nar")
+    protected String type;
+
+    /**
+     * Comma separated list of Artifact names too exclude.
+     *
+     */
+    @Parameter(property = "excludeArtifacts", required = false)
+    protected String excludeArtifactIds;
+
+    /**
+     * Comma separated list of Artifact names to include.
+     *
+     */
+    @Parameter(property = "includeArtifacts", required = false)
+    protected String includeArtifactIds;
+
+    /**
+     * Comma separated list of GroupId Names to exclude.
+     *
+     */
+    @Parameter(property = "excludeArtifacts", required = false)
+    protected String excludeGroupIds;
+
+    /**
+     * Comma separated list of GroupIds to include.
+     *
+     */
+    @Parameter(property = "includeGroupIds", required = false)
+    protected String includeGroupIds;
+
+    /**
+     * Directory to store flag files
+     *
+     */
+    @Parameter(property = "markersDirectory", required = false, defaultValue = "${project.build.directory}/dependency-maven-plugin-markers")
+    protected File markersDirectory;
+
+    /**
+     * Overwrite release artifacts
+     *
+     */
+    @Parameter(property = "overWriteReleases", required = false)
+    protected boolean overWriteReleases;
+
+    /**
+     * Overwrite snapshot artifacts
+     *
+     */
+    @Parameter(property = "overWriteSnapshots", required = false)
+    protected boolean overWriteSnapshots;
+
+    /**
+     * Overwrite artifacts that don't exist or are older than the source.
+     *
+     */
+    @Parameter(property = "overWriteIfNewer", required = false, defaultValue = "true")
+    protected boolean overWriteIfNewer;
+
+    /**
+     * Used to look up Artifacts in the remote repository.
+     */
+    @Component
+    protected ArtifactFactory factory;
+
+    /**
+     * Used to look up Artifacts in the remote repository.
+     *
+     */
+    @Component
+    protected ArtifactResolver resolver;
+
+    /**
+     * Artifact collector, needed to resolve dependencies.
+     *
+     */
+    @Component(role = org.apache.maven.artifact.resolver.ArtifactCollector.class)
+    protected ArtifactCollector artifactCollector;
+
+    @Component(role = org.apache.maven.artifact.metadata.ArtifactMetadataSource.class)
+    protected ArtifactMetadataSource artifactMetadataSource;
+
+    /**
+     * Location of the local repository.
+     *
+     */
+    @Parameter(property = "localRepository", required = true, readonly = true)
+    protected ArtifactRepository local;
+
+    /**
+     * List of Remote Repositories used by the resolver
+     *
+     */
+    @Parameter(property = "project.remoteArtifactRepositories", required = true, readonly = true)
+    protected List remoteRepos;
+
+    /**
+     * To look up Archiver/UnArchiver implementations
+     *
+     */
+    @Component
+    protected ArchiverManager archiverManager;
+
+    /**
+     * Contains the full list of projects in the reactor.
+     *
+     */
+    @Parameter(property = "reactorProjects", required = true, readonly = true)
+    protected List reactorProjects;
+
+    /**
+     * If the plugin should be silent.
+     *
+     */
+    @Parameter(property = "silent", required = false, defaultValue = "false")
+    public boolean silent;
+
+    /**
+     * Output absolute filename for resolved artifacts
+     *
+     */
+    @Parameter(property = "outputAbsoluteArtifactFilename", defaultValue = "false", required = false)
+    protected boolean outputAbsoluteArtifactFilename;
+
+    @Override
+    public void execute() throws MojoExecutionException, MojoFailureException {
+        copyDependencies();
+        makeNar();
+    }
+
+    private void copyDependencies() throws MojoExecutionException {
+        DependencyStatusSets dss = getDependencySets(this.failOnMissingClassifierArtifact);
+        Set artifacts = dss.getResolvedDependencies();
+
+        for (Object artifactObj : artifacts) {
+            copyArtifact((Artifact) artifactObj);
+        }
+
+        artifacts = dss.getSkippedDependencies();
+        for (Object artifactOjb : artifacts) {
+            Artifact artifact = (Artifact) artifactOjb;
+            getLog().info(artifact.getFile().getName() + " already exists in destination.");
+        }
+    }
+
+    protected void copyArtifact(Artifact artifact) throws MojoExecutionException {
+        String destFileName = DependencyUtil.getFormattedFileName(artifact, false);
+        final File destDir = DependencyUtil.getFormattedOutputDirectory(false, false, false, false, false, getDependenciesDirectory(), artifact);
+        final File destFile = new File(destDir, destFileName);
+        copyFile(artifact.getFile(), destFile);
+    }
+
+    protected Artifact getResolvedPomArtifact(Artifact artifact) {
+        Artifact pomArtifact = this.factory.createArtifact(artifact.getGroupId(), artifact.getArtifactId(), artifact.getVersion(), "", "pom");
+        // Resolve the pom artifact using repos
+        try {
+            this.resolver.resolve(pomArtifact, this.remoteRepos, this.local);
+        } catch (ArtifactResolutionException | ArtifactNotFoundException e) {
+            getLog().info(e.getMessage());
+        }
+        return pomArtifact;
+    }
+
+    protected ArtifactsFilter getMarkedArtifactFilter() {
+        return new DestFileFilter(this.overWriteReleases, this.overWriteSnapshots, this.overWriteIfNewer, false, false, false, false, false, getDependenciesDirectory());
+    }
+
+    protected DependencyStatusSets getDependencySets(boolean stopOnFailure) throws MojoExecutionException {
+        // add filters in well known order, least specific to most specific
+        FilterArtifacts filter = new FilterArtifacts();
+
+        filter.addFilter(new ProjectTransitivityFilter(project.getDependencyArtifacts(), false));
+        filter.addFilter(new ScopeFilter(this.includeScope, this.excludeScope));
+        filter.addFilter(new TypeFilter(this.includeTypes, this.excludeTypes));
+        filter.addFilter(new ClassifierFilter(this.includeClassifiers, this.excludeClassifiers));
+        filter.addFilter(new GroupIdFilter(this.includeGroupIds, this.excludeGroupIds));
+        filter.addFilter(new ArtifactIdFilter(this.includeArtifactIds, this.excludeArtifactIds));
+
+        // explicitly filter our nar dependencies
+        filter.addFilter(new TypeFilter("", "nar"));
+
+        // start with all artifacts.
+        Set artifacts = project.getArtifacts();
+
+        // perform filtering
+        try {
+            artifacts = filter.filter(artifacts);
+        } catch (ArtifactFilterException e) {
+            throw new MojoExecutionException(e.getMessage(), e);
+        }
+
+        // transform artifacts if classifier is set
+        final DependencyStatusSets status;
+        if (StringUtils.isNotEmpty(copyDepClassifier)) {
+            status = getClassifierTranslatedDependencies(artifacts, stopOnFailure);
+        } else {
+            status = filterMarkedDependencies(artifacts);
+        }
+
+        return status;
+    }
+
+    protected DependencyStatusSets getClassifierTranslatedDependencies(Set artifacts, boolean stopOnFailure) throws MojoExecutionException {
+        Set unResolvedArtifacts = new HashSet();
+        Set resolvedArtifacts = artifacts;
+        DependencyStatusSets status = new DependencyStatusSets();
+
+        // possibly translate artifacts into a new set of artifacts based on the
+        // classifier and type
+        // if this did something, we need to resolve the new artifacts
+        if (StringUtils.isNotEmpty(copyDepClassifier)) {
+            ArtifactTranslator translator = new ClassifierTypeTranslator(this.copyDepClassifier, this.type, this.factory);
+            artifacts = translator.translate(artifacts, getLog());
+
+            status = filterMarkedDependencies(artifacts);
+
+            // the unskipped artifacts are in the resolved set.
+            artifacts = status.getResolvedDependencies();
+
+            // resolve the rest of the artifacts
+            ArtifactsResolver artifactsResolver = new DefaultArtifactsResolver(this.resolver, this.local,
+                    this.remoteRepos, stopOnFailure);
+            resolvedArtifacts = artifactsResolver.resolve(artifacts, getLog());
+
+            // calculate the artifacts not resolved.
+            unResolvedArtifacts.addAll(artifacts);
+            unResolvedArtifacts.removeAll(resolvedArtifacts);
+        }
+
+        // return a bean of all 3 sets.
+        status.setResolvedDependencies(resolvedArtifacts);
+        status.setUnResolvedDependencies(unResolvedArtifacts);
+
+        return status;
+    }
+
+    protected DependencyStatusSets filterMarkedDependencies(Set artifacts) throws MojoExecutionException {
+        // remove files that have markers already
+        FilterArtifacts filter = new FilterArtifacts();
+        filter.clearFilters();
+        filter.addFilter(getMarkedArtifactFilter());
+
+        Set unMarkedArtifacts;
+        try {
+            unMarkedArtifacts = filter.filter(artifacts);
+        } catch (ArtifactFilterException e) {
+            throw new MojoExecutionException(e.getMessage(), e);
+        }
+
+        // calculate the skipped artifacts
+        Set skippedArtifacts = new HashSet();
+        skippedArtifacts.addAll(artifacts);
+        skippedArtifacts.removeAll(unMarkedArtifacts);
+
+        return new DependencyStatusSets(unMarkedArtifacts, null, skippedArtifacts);
+    }
+
+    protected void copyFile(File artifact, File destFile) throws MojoExecutionException {
+        try {
+            getLog().info("Copying " + (this.outputAbsoluteArtifactFilename ? artifact.getAbsolutePath() : artifact.getName()) + " to " + destFile);
+            FileUtils.copyFile(artifact, destFile);
+        } catch (Exception e) {
+            throw new MojoExecutionException("Error copying artifact from " + artifact + " to " + destFile, e);
+        }
+    }
+
+    private File getClassesDirectory() {
+        final File outputDirectory = new File(project.getBasedir(), "target");
+        return new File(outputDirectory, "classes");
+    }
+
+    private File getDependenciesDirectory() {
+        return new File(getClassesDirectory(), "META-INF/dependencies");
+    }
+
+    private void makeNar() throws MojoExecutionException {
+        File narFile = createArchive();
+
+        if (classifier != null) {
+            projectHelper.attachArtifact(project, "nar", classifier, narFile);
+        } else {
+            project.getArtifact().setFile(narFile);
+        }
+    }
+
+    public File createArchive() throws MojoExecutionException {
+        final File outputDirectory = new File(project.getBasedir(), "target");
+        File narFile = getNarFile(outputDirectory, finalName, classifier);
+        MavenArchiver archiver = new MavenArchiver();
+        archiver.setArchiver(jarArchiver);
+        archiver.setOutputFile(narFile);
+        archive.setForced(forceCreation);
+
+        try {
+            File contentDirectory = getClassesDirectory();
+            if (!contentDirectory.exists()) {
+                getLog().warn("NAR will be empty - no content was marked for inclusion!");
+            } else {
+                archiver.getArchiver().addDirectory(contentDirectory, getIncludes(), getExcludes());
+            }
+
+            File existingManifest = defaultManifestFile;
+            if (useDefaultManifestFile && existingManifest.exists() && archive.getManifestFile() == null) {
+                getLog().info("Adding existing MANIFEST to archive. Found under: " + existingManifest.getPath());
+                archive.setManifestFile(existingManifest);
+            }
+
+            // automatically add the artifact id to the manifest
+            archive.addManifestEntry("Nar-Id", project.getArtifactId());
+
+            // look for a nar dependency
+            String narDependency = getNarDependency();
+            if (narDependency != null) {
+                archive.addManifestEntry("Nar-Dependency-Id", narDependency);
+            }
+
+            archiver.createArchive(session, project, archive);
+            return narFile;
+        } catch (ArchiverException | MojoExecutionException | ManifestException | IOException | DependencyResolutionRequiredException e) {
+            throw new MojoExecutionException("Error assembling NAR", e);
+        }
+    }
+
+    private String[] getIncludes() {
+        if (includes != null && includes.length > 0) {
+            return includes;
+        }
+        return DEFAULT_INCLUDES;
+    }
+
+    private String[] getExcludes() {
+        if (excludes != null && excludes.length > 0) {
+            return excludes;
+        }
+        return DEFAULT_EXCLUDES;
+    }
+
+    protected File getNarFile(File basedir, String finalName, String classifier) {
+        if (classifier == null) {
+            classifier = "";
+        } else if (classifier.trim().length() > 0 && !classifier.startsWith("-")) {
+            classifier = "-" + classifier;
+        }
+
+        return new File(basedir, finalName + classifier + ".nar");
+    }
+
+    private String getNarDependency() throws MojoExecutionException {
+        String narDependency = null;
+
+        // get nar dependencies
+        FilterArtifacts filter = new FilterArtifacts();
+        filter.addFilter(new TypeFilter("nar", ""));
+
+        // start with all artifacts.
+        Set artifacts = project.getArtifacts();
+
+        // perform filtering
+        try {
+            artifacts = filter.filter(artifacts);
+        } catch (ArtifactFilterException e) {
+            throw new MojoExecutionException(e.getMessage(), e);
+        }
+
+        // ensure there is a single nar dependency
+        if (artifacts.size() > 1) {
+            throw new MojoExecutionException("Each NAR represents a ClassLoader. A NAR dependency allows that NAR's ClassLoader to be "
+                    + "used as the parent of this NAR's ClassLoader. As a result, only a single NAR dependency is allowed.");
+        } else if (artifacts.size() == 1) {
+            final Artifact artifact = (Artifact) artifacts.iterator().next();
+            narDependency = artifact.getArtifactId();
+        }
+
+        return narDependency;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
----------------------------------------------------------------------
diff --git a/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml b/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
new file mode 100644
index 0000000..0680d18
--- /dev/null
+++ b/misc/nar-maven-plugin/src/main/resources/META-INF/plexus/components.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<component-set>
+    <components>
+        <component>
+            <role>org.apache.maven.lifecycle.mapping.LifecycleMapping</role>
+            <role-hint>nar</role-hint>
+            <implementation>org.apache.maven.lifecycle.mapping.DefaultLifecycleMapping</implementation>
+            <configuration>
+                <lifecycles>
+                    <lifecycle>
+                        <id>default</id>
+                        <phases>
+                            <process-resources>org.apache.maven.plugins:maven-resources-plugin:resources</process-resources>
+                            <compile>org.apache.maven.plugins:maven-compiler-plugin:compile</compile>
+                            <process-test-resources>org.apache.maven.plugins:maven-resources-plugin:testResources</process-test-resources>
+                            <test-compile>org.apache.maven.plugins:maven-compiler-plugin:testCompile</test-compile>
+                            <test>org.apache.maven.plugins:maven-surefire-plugin:test</test>
+                            <package>org.apache.nifi:nar-maven-plugin:nar</package>
+                            <install>org.apache.maven.plugins:maven-install-plugin:install</install>
+                            <deploy>org.apache.maven.plugins:maven-deploy-plugin:deploy</deploy>
+                        </phases>
+                    </lifecycle>
+                </lifecycles>
+            </configuration>
+        </component>
+        <component>
+            <role>org.apache.maven.artifact.handler.ArtifactHandler</role>
+            <role-hint>nar</role-hint>
+            <implementation>org.apache.maven.artifact.handler.DefaultArtifactHandler</implementation>
+            <configuration>
+                <type>nar</type>
+                <language>java</language>
+                <addedToClasspath>false</addedToClasspath>
+                <includesDependencies>true</includesDependencies>
+            </configuration>
+        </component>
+    </components>
+</component-set>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
new file mode 100644
index 0000000..6280349
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/pom.xml
@@ -0,0 +1,67 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.nifi</groupId>
+		<artifactId>distributed-cache-services-bundle</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>distributed-cache-client-service</artifactId>
+	<packaging>jar</packaging>
+
+	<name>Distributed Cache Client Service</name>
+	<description>Provides a Client for interfacing with a Distributed Cache</description>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>distributed-cache-client-service-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>distributed-cache-protocol</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>remote-communications-utils</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-processor-utils</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-stream-utils</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>ssl-context-service-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<version>4.9</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
new file mode 100644
index 0000000..f838c2f
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CommsSession.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+public interface CommsSession extends Closeable {
+
+    void setTimeout(final long value, final TimeUnit timeUnit);
+    
+    InputStream getInputStream() throws IOException;
+    
+    OutputStream getOutputStream() throws IOException;
+    
+    boolean isClosed();
+    
+    void interrupt();
+    
+    String getHostname();
+    
+    int getPort();
+    
+    long getTimeout(TimeUnit timeUnit);
+    
+    SSLContext getSSLContext();
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
----------------------------------------------------------------------
diff --git a/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
new file mode 100644
index 0000000..ee96660
--- /dev/null
+++ b/nar-bundles/distributed-cache-services-bundle/distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.distributed.cache.client;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.distributed.cache.protocol.ProtocolHandshake;
+import org.apache.nifi.distributed.cache.protocol.exception.HandshakeException;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.remote.StandardVersionNegotiator;
+import org.apache.nifi.remote.VersionNegotiator;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.ssl.SSLContextService.ClientAuth;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistributedMapCacheClientService extends AbstractControllerService implements DistributedMapCacheClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedMapCacheClientService.class);
+
+    public static final PropertyDescriptor HOSTNAME = new PropertyDescriptor.Builder()
+            .name("Server Hostname")
+            .description("The name of the server that is running the DistributedMapCacheServer service")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
+            .name("Server Port")
+            .description("The port on the remote server that is to be used when communicating with the DistributedMapCacheServer service")
+            .required(true)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .defaultValue("4557")
+            .build();
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description(
+                    "If specified, indicates the SSL Context Service that is used to communicate with the remote server. If not specified, communications will not be encrypted")
+            .required(false)
+            .addValidator(StandardValidators.createControllerServiceExistsValidator(SSLContextService.class))
+            .defaultValue(null)
+            .build();
+    public static final PropertyDescriptor COMMUNICATIONS_TIMEOUT = new PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .description(
+                    "Specifies how long to wait when communicating with the remote server before determining that there is a communications failure if data cannot be sent or received")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    private final BlockingQueue<CommsSession> queue = new LinkedBlockingQueue<>();
+    private volatile ConfigurationContext configContext;
+    private volatile boolean closed = false;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(HOSTNAME);
+        descriptors.add(PORT);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(COMMUNICATIONS_TIMEOUT);
+        return descriptors;
+    }
+
+    @OnConfigured
+    public void cacheConfig(final ConfigurationContext context) {
+        this.configContext = context;
+    }
+
+    @Override
+    public <K, V> boolean putIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer)
+            throws IOException {
+        return withCommsSession(new CommsAction<Boolean>() {
+            @Override
+            public Boolean execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("putIfAbsent");
+
+                serialize(key, keySerializer, dos);
+                serialize(value, valueSerializer, dos);
+
+                dos.flush();
+
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                return dis.readBoolean();
+            }
+        });
+    }
+
+    @Override
+    public <K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
+        return withCommsSession(new CommsAction<Boolean>() {
+            @Override
+            public Boolean execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("containsKey");
+
+                serialize(key, keySerializer, dos);
+                dos.flush();
+
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                return dis.readBoolean();
+            }
+        });
+    }
+
+    @Override
+    public <K, V> V getAndPutIfAbsent(final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
+            final Deserializer<V> valueDeserializer) throws IOException {
+        return withCommsSession(new CommsAction<V>() {
+            @Override
+            public V execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("getAndPutIfAbsent");
+
+                serialize(key, keySerializer, dos);
+                serialize(value, valueSerializer, dos);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+                return valueDeserializer.deserialize(responseBuffer);
+            }
+        });
+    }
+
+    @Override
+    public <K, V> V get(final K key, final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
+        return withCommsSession(new CommsAction<V>() {
+            @Override
+            public V execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("get");
+
+                serialize(key, keySerializer, dos);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                final byte[] responseBuffer = readLengthDelimitedResponse(dis);
+                return valueDeserializer.deserialize(responseBuffer);
+            }
+        });
+    }
+
+    @Override
+    public <K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
+        return withCommsSession(new CommsAction<Boolean>() {
+            @Override
+            public Boolean execute(final CommsSession session) throws IOException {
+                final DataOutputStream dos = new DataOutputStream(session.getOutputStream());
+                dos.writeUTF("remove");
+
+                serialize(key, serializer, dos);
+                dos.flush();
+
+                // read response
+                final DataInputStream dis = new DataInputStream(session.getInputStream());
+                return dis.readBoolean();
+            }
+        });
+    }
+
+    private byte[] readLengthDelimitedResponse(final DataInputStream dis) throws IOException {
+        final int responseLength = dis.readInt();
+        final byte[] responseBuffer = new byte[responseLength];
+        dis.readFully(responseBuffer);
+        return responseBuffer;
+    }
+
+    public CommsSession createCommsSession(final ConfigurationContext context) throws IOException {
+        final String hostname = context.getProperty(HOSTNAME).getValue();
+        final int port = context.getProperty(PORT).asInteger();
+        final long timeoutMillis = context.getProperty(COMMUNICATIONS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS);
+        final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+
+        final CommsSession commsSession;
+        if (sslContextService == null) {
+            commsSession = new StandardCommsSession(hostname, port);
+        } else {
+            commsSession = new SSLCommsSession(sslContextService.createSSLContext(ClientAuth.REQUIRED), hostname, port);
+        }
+
+        commsSession.setTimeout(timeoutMillis, TimeUnit.MILLISECONDS);
+        return commsSession;
+    }
+
+    private CommsSession leaseCommsSession() throws IOException {
+        CommsSession session = queue.poll();
+        if (session != null && !session.isClosed()) {
+            return session;
+        }
+
+        session = createCommsSession(configContext);
+        final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(1);
+        try {
+            ProtocolHandshake.initiateHandshake(session.getInputStream(), session.getOutputStream(), versionNegotiator);
+        } catch (final HandshakeException e) {
+            try {
+                session.close();
+            } catch (final IOException ioe) {
+            }
+
+            throw new IOException(e);
+        }
+
+        return session;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.closed = true;
+
+        CommsSession commsSession;
+        while ((commsSession = queue.poll()) != null) {
+            try (final DataOutputStream dos = new DataOutputStream(commsSession.getOutputStream())) {
+                dos.writeUTF("close");
+                dos.flush();
+                commsSession.close();
+            } catch (final IOException e) {
+            }
+        }
+        logger.info("Closed {}", new Object[] { getIdentifier() });
+    }
+
+    @Override
+    protected void finalize() throws Throwable {
+        if (!closed)
+            close();
+        logger.debug("Finalize called");
+    }
+
+    private <T> void serialize(final T value, final Serializer<T> serializer, final DataOutputStream dos) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        serializer.serialize(value, baos);
+        dos.writeInt(baos.size());
+        baos.writeTo(dos);
+    }
+
+    private <T> T withCommsSession(final CommsAction<T> action) throws IOException {
+        if (closed) {
+            throw new IllegalStateException("Client is closed");
+        }
+
+        final CommsSession session = leaseCommsSession();
+        try {
+            return action.execute(session);
+        } catch (final IOException ioe) {
+            try {
+                session.close();
+            } catch (final IOException ignored) {
+            }
+
+            throw ioe;
+        } finally {
+            if (!session.isClosed()) {
+                if (this.closed) {
+                    try {
+                        session.close();
+                    } catch (final IOException ioe) {
+                    }
+                } else {
+                    queue.offer(session);
+                }
+            }
+        }
+    }
+
+    private static interface CommsAction<T> {
+        T execute(CommsSession commsSession) throws IOException;
+    }
+
+}