You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2016/07/25 16:37:53 UTC
nifi git commit: NIFI-1950 Updating FileAuthorizer to convert access
controls from input and output ports during legacy conversion. This closes
#702.
Repository: nifi
Updated Branches:
refs/heads/master b08285859 -> 3e9867d5d
NIFI-1950 Updating FileAuthorizer to convert access controls from input and output ports during legacy conversion. This closes #702.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3e9867d5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3e9867d5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3e9867d5
Branch: refs/heads/master
Commit: 3e9867d5da3b11f7ebba33844bb360024f662aee
Parents: b082858
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Jul 21 12:18:09 2016 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jul 25 12:37:26 2016 -0400
----------------------------------------------------------------------
.../nifi/authorization/FileAuthorizer.java | 220 +++++++++++--------
.../org/apache/nifi/authorization/FlowInfo.java | 45 ++++
.../apache/nifi/authorization/FlowParser.java | 181 +++++++++++++++
.../nifi/authorization/FileAuthorizerTest.java | 54 ++++-
.../src/test/resources/flow-no-ports.xml.gz | Bin 0 -> 730 bytes
.../src/test/resources/flow.xml.gz | Bin 566 -> 730 bytes
.../src/main/resources/conf/authorizers.xml | 5 +-
.../nifi/web/api/DataTransferResource.java | 12 +
8 files changed, 419 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e9867d5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizer.java
index a672b30..f571c32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FileAuthorizer.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.authorization;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.annotation.AuthorizerContext;
import org.apache.nifi.authorization.exception.AuthorizationAccessException;
@@ -26,14 +25,14 @@ import org.apache.nifi.authorization.file.generated.Groups;
import org.apache.nifi.authorization.file.generated.Policies;
import org.apache.nifi.authorization.file.generated.Policy;
import org.apache.nifi.authorization.file.generated.Users;
+import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.web.api.dto.PortDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
import org.xml.sax.SAXException;
import javax.xml.XMLConstants;
@@ -42,20 +41,12 @@ import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
-import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -68,7 +59,6 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import java.util.zip.GZIPInputStream;
/**
* Provides authorizes requests to resources using policies persisted in a file.
@@ -83,8 +73,6 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
private static final String USERS_XSD = "/users.xsd";
private static final String JAXB_USERS_PATH = "org.apache.nifi.user.generated";
- private static final String FLOW_XSD = "/FlowConfiguration.xsd";
-
private static final JAXBContext JAXB_AUTHORIZATIONS_CONTEXT = initializeJaxbContext(JAXB_AUTHORIZATIONS_PATH);
private static final JAXBContext JAXB_USERS_CONTEXT = initializeJaxbContext(JAXB_USERS_PATH);
@@ -107,7 +95,6 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
static final String PROP_LEGACY_AUTHORIZED_USERS_FILE = "Legacy Authorized Users File";
static final Pattern NODE_IDENTITY_PATTERN = Pattern.compile("Node Identity \\S+");
- private Schema flowSchema;
private Schema usersSchema;
private Schema authorizationsSchema;
private SchemaFactory schemaFactory;
@@ -118,6 +105,7 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
private String initialAdminIdentity;
private String legacyAuthorizedUsersFile;
private Set<String> nodeIdentities;
+ private List<PortDTO> ports = new ArrayList<>();
private final AtomicReference<AuthorizationsHolder> authorizationsHolder = new AtomicReference<>();
@@ -127,7 +115,6 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
authorizationsSchema = schemaFactory.newSchema(FileAuthorizer.class.getResource(AUTHORIZATIONS_XSD));
usersSchema = schemaFactory.newSchema(FileAuthorizer.class.getResource(USERS_XSD));
- flowSchema = schemaFactory.newSchema(FileAuthorizer.class.getResource(FLOW_XSD));
} catch (Exception e) {
throw new AuthorizerCreationException(e);
}
@@ -200,7 +187,7 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
logger.info(String.format("Authorizations file loaded at %s", new Date().toString()));
- } catch (IOException | AuthorizerCreationException | JAXBException | IllegalStateException e) {
+ } catch (IOException | AuthorizerCreationException | JAXBException | IllegalStateException | SAXException e) {
throw new AuthorizerCreationException(e);
}
}
@@ -212,7 +199,7 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
* @throws IOException Unable to sync file with restore
* @throws IllegalStateException Unable to sync file with restore
*/
- private synchronized void load() throws JAXBException, IOException, IllegalStateException {
+ private synchronized void load() throws JAXBException, IOException, IllegalStateException, SAXException {
// attempt to unmarshal
final Unmarshaller unmarshaller = JAXB_AUTHORIZATIONS_CONTEXT.createUnmarshaller();
unmarshaller.setSchema(authorizationsSchema);
@@ -237,8 +224,7 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
// if we are starting fresh then we might need to populate an initial admin or convert legacy users
if (emptyAuthorizations) {
- // try to extract the root group id from the flow configuration file specified in nifi.properties
- rootGroupId = getRootGroupId();
+ parseFlow();
if (hasInitialAdminIdentity && hasLegacyAuthorizedUsers) {
throw new AuthorizerCreationException("Cannot provide an Initial Admin Identity and a Legacy Authorized Users File");
@@ -260,68 +246,17 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
}
/**
- * Extracts the root group id from the flow configuration file provided in nifi.properties.
+ * Try to parse the flow configuration file to extract the root group id and port information.
*
- * @return the root group id, or null if the files doesn't exist, was empty, or could not be parsed
+ * @throws SAXException if an error occurs creating the schema
*/
- private String getRootGroupId() {
- final File flowFile = properties.getFlowConfigurationFile();
- if (flowFile == null) {
- logger.debug("Flow Configuration file was null");
- return null;
- }
+ private void parseFlow() throws SAXException {
+ final FlowParser flowParser = new FlowParser();
+ final FlowInfo flowInfo = flowParser.parse(properties.getFlowConfigurationFile());
- // if the flow doesn't exist or is 0 bytes, then return null
- final Path flowPath = flowFile.toPath();
- try {
- if (!Files.exists(flowPath) || Files.size(flowPath) == 0) {
- logger.debug("Flow Configuration does not exist or was empty");
- return null;
- }
- } catch (IOException e) {
- logger.debug("An error occurred determining the size of the Flow Configuration file");
- return null;
- }
-
- // otherwise create the appropriate input streams to read the file
- try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ);
- final InputStream gzipIn = new GZIPInputStream(in)) {
-
- final byte[] flowBytes = IOUtils.toByteArray(gzipIn);
- if (flowBytes == null || flowBytes.length == 0) {
- logger.debug("Could not extract root group id because Flow Configuration File was empty");
- return null;
- }
-
- // create validating document builder
- final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- docFactory.setNamespaceAware(true);
- docFactory.setSchema(flowSchema);
-
- // parse the flow
- final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
- final Document document = docBuilder.parse(new ByteArrayInputStream(flowBytes));
-
- // extract the root group id
- final Element rootElement = document.getDocumentElement();
-
- final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
- if (rootGroupElement == null) {
- logger.debug("rootGroup element not found in Flow Configuration file");
- return null;
- }
-
- final Element rootGroupIdElement = (Element) rootGroupElement.getElementsByTagName("id").item(0);
- if (rootGroupIdElement == null) {
- logger.debug("id element not found under rootGroup in Flow Configuration file");
- return null;
- }
-
- return rootGroupIdElement.getTextContent();
-
- } catch (final SAXException | ParserConfigurationException | IOException ex) {
- logger.error("Unable to find root group id in {} due to {}", new Object[] { flowPath.toAbsolutePath(), ex });
- return null;
+ if (flowInfo != null) {
+ rootGroupId = flowInfo.getRootGroupId();
+ ports = flowInfo.getPorts() == null ? new ArrayList<>() : flowInfo.getPorts();
}
}
@@ -352,6 +287,10 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
// grant the user read/write access to the /policies resource
addAccessPolicy(authorizations, ResourceType.Policy.getValue(), adminUser.getIdentifier(), READ_CODE);
addAccessPolicy(authorizations, ResourceType.Policy.getValue(), adminUser.getIdentifier(), WRITE_CODE);
+
+ // grant the user read/write access to the /controller resource
+ addAccessPolicy(authorizations, ResourceType.Controller.getValue(), adminUser.getIdentifier(), READ_CODE);
+ addAccessPolicy(authorizations, ResourceType.Controller.getValue(), adminUser.getIdentifier(), WRITE_CODE);
}
/**
@@ -428,8 +367,8 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
for (org.apache.nifi.user.generated.User legacyUser : users.getUser()) {
// create the identifier of the new user based on the DN
- String legacyUserDn = legacyUser.getDn();
- String userIdentifier = UUID.nameUUIDFromBytes(legacyUserDn.getBytes(StandardCharsets.UTF_8)).toString();
+ final String legacyUserDn = legacyUser.getDn();
+ final String userIdentifier = UUID.nameUUIDFromBytes(legacyUserDn.getBytes(StandardCharsets.UTF_8)).toString();
// create the new User and add it to the list of users
org.apache.nifi.authorization.file.generated.User user = new org.apache.nifi.authorization.file.generated.User();
@@ -459,30 +398,129 @@ public class FileAuthorizer extends AbstractPolicyBasedAuthorizer {
roleAccessPolicy.getResource(),
roleAccessPolicy.getAction());
- // determine if the user already exists in the policy
- boolean userExists = false;
- for (Policy.User policyUser : policy.getUser()) {
- if (policyUser.getIdentifier().equals(userIdentifier)) {
- userExists = true;
+ // add the user to the policy if it doesn't exist
+ addUserToPolicy(userIdentifier, policy);
+ }
+ }
+
+ }
+
+ // convert any access controls on ports to the appropriate policies
+ for (PortDTO portDTO : ports) {
+ final boolean isInputPort = portDTO.getType() != null && portDTO.getType().equals("inputPort");
+ final Resource resource = ResourceFactory.getDataTransferResource(isInputPort, portDTO.getId(), portDTO.getName());
+
+ if (portDTO.getUserAccessControl() != null) {
+ for (String userAccessControl : portDTO.getUserAccessControl()) {
+ // find a user where the identity is the userAccessControl
+ org.apache.nifi.authorization.file.generated.User foundUser = null;
+ for (org.apache.nifi.authorization.file.generated.User jaxbUser : authorizations.getUsers().getUser()) {
+ if (jaxbUser.getIdentity().equals(userAccessControl)) {
+ foundUser = jaxbUser;
break;
}
}
- // add the user to the policy if doesn't already exist
- if (!userExists) {
- Policy.User policyUser = new Policy.User();
- policyUser.setIdentifier(userIdentifier);
- policy.getUser().add(policyUser);
+ // couldn't find the user matching the access control so log a warning and skip
+ if (foundUser == null) {
+ logger.warn("Found port with user access control for {} but no user exists with this identity, skipping...",
+ new Object[] {userAccessControl});
+ continue;
}
+
+ // we found the user so create the appropriate policy and add the user to it
+ Policy policy = getOrCreatePolicy(
+ allPolicies,
+ seedIdentity,
+ resource.getIdentifier(),
+ WRITE_CODE);
+
+ addUserToPolicy(foundUser.getIdentifier(), policy);
}
}
+ if (portDTO.getGroupAccessControl() != null) {
+ for (String groupAccessControl : portDTO.getGroupAccessControl()) {
+ // find a group where the name is the groupAccessControl
+ org.apache.nifi.authorization.file.generated.Group foundGroup = null;
+ for (org.apache.nifi.authorization.file.generated.Group jaxbGroup : authorizations.getGroups().getGroup()) {
+ if (jaxbGroup.getName().equals(groupAccessControl)) {
+ foundGroup = jaxbGroup;
+ break;
+ }
+ }
+
+ // couldn't find the group matching the access control so log a warning and skip
+ if (foundGroup == null) {
+ logger.warn("Found port with group access control for {} but no group exists with this name, skipping...",
+ new Object[] {groupAccessControl});
+ continue;
+ }
+
+ // we found the group so create the appropriate policy and add all the users to it
+ Policy policy = getOrCreatePolicy(
+ allPolicies,
+ seedIdentity,
+ resource.getIdentifier(),
+ WRITE_CODE);
+
+ addGroupToPolicy(foundGroup.getIdentifier(), policy);
+ }
+ }
}
authorizations.getPolicies().getPolicy().addAll(allPolicies);
}
/**
+ * Adds the given user identifier to the policy if it doesn't already exist.
+ *
+ * @param userIdentifier a user identifier
+ * @param policy a policy to add the user to
+ */
+ private void addUserToPolicy(final String userIdentifier, final Policy policy) {
+ // determine if the user already exists in the policy
+ boolean userExists = false;
+ for (Policy.User policyUser : policy.getUser()) {
+ if (policyUser.getIdentifier().equals(userIdentifier)) {
+ userExists = true;
+ break;
+ }
+ }
+
+ // add the user to the policy if doesn't already exist
+ if (!userExists) {
+ Policy.User policyUser = new Policy.User();
+ policyUser.setIdentifier(userIdentifier);
+ policy.getUser().add(policyUser);
+ }
+ }
+
+ /**
+ * Adds the given group identifier to the policy if it doesn't already exist.
+ *
+ * @param groupIdentifier a group identifier
+ * @param policy a policy to add the user to
+ */
+ private void addGroupToPolicy(final String groupIdentifier, final Policy policy) {
+ // determine if the group already exists in the policy
+ boolean groupExists = false;
+ for (Policy.Group policyGroup : policy.getGroup()) {
+ if (policyGroup.getIdentifier().equals(groupIdentifier)) {
+ groupExists = true;
+ break;
+ }
+ }
+
+ // add the group to the policy if doesn't already exist
+ if (!groupExists) {
+ Policy.Group policyGroup = new Policy.Group();
+ policyGroup.setIdentifier(groupIdentifier);
+ policy.getGroup().add(policyGroup);
+ }
+ }
+
+ /**
* Finds the Group with the given name, or creates a new one and adds it to Authorizations.
*
* @param authorizations the Authorizations reference
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e9867d5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FlowInfo.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FlowInfo.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FlowInfo.java
new file mode 100644
index 0000000..413bce3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FlowInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization;
+
+import org.apache.nifi.web.api.dto.PortDTO;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class FlowInfo {
+
+ private final String rootGroupId;
+
+ private final List<PortDTO> ports;
+
+ public FlowInfo(final String rootGroupId, final List<PortDTO> ports) {
+ this.rootGroupId = rootGroupId;
+ this.ports = (ports == null ? Collections.unmodifiableList(Collections.EMPTY_LIST) :
+ Collections.unmodifiableList(new ArrayList<>(ports)) );
+ }
+
+ public String getRootGroupId() {
+ return rootGroupId;
+ }
+
+ public List<PortDTO> getPorts() {
+ return ports;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e9867d5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FlowParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FlowParser.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FlowParser.java
new file mode 100644
index 0000000..1384aeb
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/main/java/org/apache/nifi/authorization/FlowParser.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.authorization;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Parses a flow and returns the root group id and root group ports.
+ */
+public class FlowParser {
+
+ private static final Logger logger = LoggerFactory.getLogger(FlowParser.class);
+
+ private static final String FLOW_XSD = "/FlowConfiguration.xsd";
+
+ private Schema flowSchema;
+ private SchemaFactory schemaFactory;
+
+ public FlowParser() throws SAXException {
+ schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ flowSchema = schemaFactory.newSchema(FileAuthorizer.class.getResource(FLOW_XSD));
+ }
+
+ /**
+ * Extracts the root group id from the flow configuration file provided in nifi.properties, and extracts
+ * the root group input ports and output ports, and their access controls.
+ *
+ */
+ public FlowInfo parse(final File flowConfigurationFile) {
+ if (flowConfigurationFile == null) {
+ logger.debug("Flow Configuration file was null");
+ return null;
+ }
+
+ // if the flow doesn't exist or is 0 bytes, then return null
+ final Path flowPath = flowConfigurationFile.toPath();
+ try {
+ if (!Files.exists(flowPath) || Files.size(flowPath) == 0) {
+ logger.warn("Flow Configuration does not exist or was empty");
+ return null;
+ }
+ } catch (IOException e) {
+ logger.error("An error occurred determining the size of the Flow Configuration file");
+ return null;
+ }
+
+ // otherwise create the appropriate input streams to read the file
+ try (final InputStream in = Files.newInputStream(flowPath, StandardOpenOption.READ);
+ final InputStream gzipIn = new GZIPInputStream(in)) {
+
+ final byte[] flowBytes = IOUtils.toByteArray(gzipIn);
+ if (flowBytes == null || flowBytes.length == 0) {
+ logger.warn("Could not extract root group id because Flow Configuration File was empty");
+ return null;
+ }
+
+ // create validating document builder
+ final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+ docFactory.setNamespaceAware(true);
+ docFactory.setSchema(flowSchema);
+
+ // parse the flow
+ final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+ final Document document = docBuilder.parse(new ByteArrayInputStream(flowBytes));
+
+ // extract the root group id
+ final Element rootElement = document.getDocumentElement();
+
+ final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
+ if (rootGroupElement == null) {
+ logger.warn("rootGroup element not found in Flow Configuration file");
+ return null;
+ }
+
+ final Element rootGroupIdElement = (Element) rootGroupElement.getElementsByTagName("id").item(0);
+ if (rootGroupIdElement == null) {
+ logger.warn("id element not found under rootGroup in Flow Configuration file");
+ return null;
+ }
+
+ final String rootGroupId = rootGroupIdElement.getTextContent();
+
+ final List<PortDTO> ports = new ArrayList<>();
+ ports.addAll(getPorts(rootGroupElement, "inputPort"));
+ ports.addAll(getPorts(rootGroupElement, "outputPort"));
+
+ return new FlowInfo(rootGroupId, ports);
+
+ } catch (final SAXException | ParserConfigurationException | IOException ex) {
+ logger.error("Unable to parse flow {} due to {}", new Object[] { flowPath.toAbsolutePath(), ex });
+ return null;
+ }
+ }
+
+ /**
+ * Gets the ports that are direct children of the given element.
+ *
+ * @param element the element containing ports
+ * @param type the type of port to find (inputPort or outputPort)
+ * @return a list of PortDTOs representing the found ports
+ */
+ private List<PortDTO> getPorts(final Element element, final String type) {
+ final List<PortDTO> ports = new ArrayList<>();
+
+ // add input ports
+ final List<Element> portNodeList = getChildrenByTagName(element, type);
+ for (final Element portElement : portNodeList) {
+ final PortDTO portDTO = FlowFromDOMFactory.getPort(portElement);
+ portDTO.setType(type);
+ ports.add(portDTO);
+ }
+
+ return ports;
+ }
+
+ /**
+ * Finds child elements with the given tagName.
+ *
+ * @param element the parent element
+ * @param tagName the child element name to find
+ * @return a list of matching child elements
+ */
+ private static List<Element> getChildrenByTagName(final Element element, final String tagName) {
+ final List<Element> matches = new ArrayList<>();
+ final NodeList nodeList = element.getChildNodes();
+ for (int i = 0; i < nodeList.getLength(); i++) {
+ final Node node = nodeList.item(i);
+ if (!(node instanceof Element)) {
+ continue;
+ }
+
+ final Element child = (Element) nodeList.item(i);
+ if (child.getNodeName().equals(tagName)) {
+ matches.add(child);
+ }
+ }
+
+ return matches;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e9867d5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java
index 1eed4e6..d1cf0c4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/java/org/apache/nifi/authorization/FileAuthorizerTest.java
@@ -111,6 +111,7 @@ public class FileAuthorizerTest {
private File primary;
private File restore;
private File flow;
+ private File flowNoPorts;
private AuthorizerConfigurationContext configurationContext;
@@ -127,6 +128,9 @@ public class FileAuthorizerTest {
flow = new File("src/test/resources/flow.xml.gz");
FileUtils.ensureDirectoryExistAndCanAccess(flow.getParentFile());
+ flowNoPorts = new File("src/test/resources/flow-no-ports.xml.gz");
+ FileUtils.ensureDirectoryExistAndCanAccess(flowNoPorts.getParentFile());
+
properties = mock(NiFiProperties.class);
when(properties.getRestoreDirectory()).thenReturn(restore.getParentFile());
when(properties.getFlowConfigurationFile()).thenReturn(flow);
@@ -166,6 +170,29 @@ public class FileAuthorizerTest {
}
@Test
+ public void testOnConfiguredWhenLegacyUsersFileProvidedAndFlowHasNoPorts() throws Exception {
+ properties = mock(NiFiProperties.class);
+ when(properties.getRestoreDirectory()).thenReturn(restore.getParentFile());
+ when(properties.getFlowConfigurationFile()).thenReturn(flowNoPorts);
+
+ when(configurationContext.getProperty(Mockito.eq(FileAuthorizer.PROP_LEGACY_AUTHORIZED_USERS_FILE)))
+ .thenReturn(new StandardPropertyValue("src/test/resources/authorized-users.xml", null));
+
+ writeAuthorizationsFile(primary, EMPTY_AUTHORIZATIONS_CONCISE);
+ authorizer.onConfigured(configurationContext);
+
+ boolean foundDataTransferPolicy = false;
+ for (AccessPolicy policy : authorizer.getAccessPolicies()) {
+ if (policy.getResource().contains(ResourceType.DataTransfer.name())) {
+ foundDataTransferPolicy = true;
+ break;
+ }
+ }
+
+ assertFalse(foundDataTransferPolicy);
+ }
+
+ @Test
public void testOnConfiguredWhenLegacyUsersFileProvided() throws Exception {
when(configurationContext.getProperty(Mockito.eq(FileAuthorizer.PROP_LEGACY_AUTHORIZED_USERS_FILE)))
.thenReturn(new StandardPropertyValue("src/test/resources/authorized-users.xml", null));
@@ -198,7 +225,8 @@ public class FileAuthorizerTest {
// verify one group got created
final Set<Group> groups = authorizer.getGroups();
assertEquals(1, groups.size());
- assertEquals("group1", groups.iterator().next().getName());
+ final Group group1 = groups.iterator().next();
+ assertEquals("group1", group1.getName());
// verify more than one policy got created
final Set<AccessPolicy> policies = authorizer.getAccessPolicies();
@@ -238,7 +266,7 @@ public class FileAuthorizerTest {
// verify user4's policies
final Map<String,Set<RequestAction>> user4Policies = getResourceActions(policies, user4);
- assertEquals(5, user4Policies.size());
+ assertEquals(6, user4Policies.size());
assertTrue(user4Policies.containsKey(ResourceType.Flow.getValue()));
assertEquals(1, user4Policies.get(ResourceType.Flow.getValue()).size());
@@ -266,11 +294,25 @@ public class FileAuthorizerTest {
// verify user6's policies
final Map<String,Set<RequestAction>> user6Policies = getResourceActions(policies, user6);
- assertEquals(2, user6Policies.size());
+ assertEquals(3, user6Policies.size());
assertTrue(user6Policies.containsKey(ResourceType.SiteToSite.getValue()));
assertEquals(2, user6Policies.get(ResourceType.SiteToSite.getValue()).size());
assertTrue(user6Policies.get(ResourceType.SiteToSite.getValue()).contains(RequestAction.WRITE));
+
+ final Resource inputPortResource = ResourceFactory.getDataTransferResource(true, "2f7d1606-b090-4be7-a592-a5b70fb55531", "TCP Input");
+ final AccessPolicy inputPortPolicy = authorizer.getUsersAndAccessPolicies().getAccessPolicy(inputPortResource.getIdentifier(), RequestAction.WRITE);
+ assertNotNull(inputPortPolicy);
+ assertEquals(1, inputPortPolicy.getUsers().size());
+ assertTrue(inputPortPolicy.getUsers().contains(user6.getIdentifier()));
+ assertEquals(1, inputPortPolicy.getGroups().size());
+ assertTrue(inputPortPolicy.getGroups().contains(group1.getIdentifier()));
+
+ final Resource outputPortResource = ResourceFactory.getDataTransferResource(false, "2f7d1606-b090-4be7-a592-a5b70fb55532", "TCP Output");
+ final AccessPolicy outputPortPolicy = authorizer.getUsersAndAccessPolicies().getAccessPolicy(outputPortResource.getIdentifier(), RequestAction.WRITE);
+ assertNotNull(outputPortPolicy);
+ assertEquals(1, outputPortPolicy.getUsers().size());
+ assertTrue(outputPortPolicy.getUsers().contains(user4.getIdentifier()));
}
private Map<String,Set<RequestAction>> getResourceActions(final Set<AccessPolicy> policies, final User user) {
@@ -342,7 +384,7 @@ public class FileAuthorizerTest {
assertEquals(adminIdentity, adminUser.getIdentity());
final Set<AccessPolicy> policies = authorizer.getAccessPolicies();
- assertEquals(7, policies.size());
+ assertEquals(9, policies.size());
final String rootGroupResource = ResourceType.ProcessGroup.getValue() + "/" + ROOT_GROUP_ID;
@@ -379,7 +421,7 @@ public class FileAuthorizerTest {
assertEquals(adminIdentity, adminUser.getIdentity());
final Set<AccessPolicy> policies = authorizer.getAccessPolicies();
- assertEquals(5, policies.size());
+ assertEquals(7, policies.size());
final String rootGroupResource = ResourceType.ProcessGroup.getValue() + "/" + ROOT_GROUP_ID;
@@ -416,7 +458,7 @@ public class FileAuthorizerTest {
assertEquals(adminIdentity, adminUser.getIdentity());
final Set<AccessPolicy> policies = authorizer.getAccessPolicies();
- assertEquals(5, policies.size());
+ assertEquals(7, policies.size());
final String rootGroupResource = ResourceType.ProcessGroup.getValue() + "/" + ROOT_GROUP_ID;
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e9867d5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow-no-ports.xml.gz
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow-no-ports.xml.gz b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow-no-ports.xml.gz
new file mode 100644
index 0000000..95cca27
Binary files /dev/null and b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow-no-ports.xml.gz differ
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e9867d5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow.xml.gz
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow.xml.gz b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow.xml.gz
index c2ac4ed..95cca27 100644
Binary files a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow.xml.gz and b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-file-authorizer/src/test/resources/flow.xml.gz differ
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e9867d5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
index aa47a14..4971d62 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/authorizers.xml
@@ -37,6 +37,8 @@
- Node Identity [unique key] - The identity of a NiFi cluster node. When clustered, a property for each node
should be defined, so that every node knows about every other node. If not clustered these properties can be ignored.
+ The name of each property must be unique, for example for a three node cluster:
+ "Node Identity A", "Node Identity B", "Node Identity C" or "Node Identity 1", "Node Identity 2", "Node Identity 3"
-->
<authorizer>
<identifier>file-provider</identifier>
@@ -44,7 +46,8 @@
<property name="Authorizations File">./conf/authorizations.xml</property>
<property name="Initial Admin Identity"></property>
<property name="Legacy Authorized Users File"></property>
- <!--
+
+ <!-- Provide the identity (typically a DN) of each node when clustered, see above description of Node Identity.
<property name="Node Identity 1"></property>
<property name="Node Identity 2"></property>
-->
http://git-wip-us.apache.org/repos/asf/nifi/blob/3e9867d5/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
index e77d769..bd61841 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/DataTransferResource.java
@@ -30,6 +30,7 @@ import org.apache.nifi.authorization.AuthorizationResult.Result;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.Resource;
+import org.apache.nifi.authorization.UserContextKeys;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUser;
@@ -77,6 +78,8 @@ import javax.ws.rs.core.UriInfo;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.nifi.remote.protocol.HandshakeProperty.BATCH_COUNT;
@@ -125,6 +128,14 @@ public class DataTransferResource extends ApplicationResource {
throw new IllegalArgumentException("The resource must be an Input or Output Port.");
}
+ final Map<String,String> userContext;
+ if (user.getClientAddress() != null && !user.getClientAddress().trim().isEmpty()) {
+ userContext = new HashMap<>();
+ userContext.put(UserContextKeys.CLIENT_ADDRESS.name(), user.getClientAddress());
+ } else {
+ userContext = null;
+ }
+
// TODO - use DataTransferAuthorizable after looking up underlying component for consistentency
final Resource resource = ResourceFactory.getComponentResource(resourceType, identifier, identifier);
final AuthorizationRequest request = new AuthorizationRequest.Builder()
@@ -133,6 +144,7 @@ public class DataTransferResource extends ApplicationResource {
.anonymous(user.isAnonymous())
.accessAttempt(true)
.action(RequestAction.WRITE)
+ .userContext(userContext)
.build();
final AuthorizationResult result = authorizer.authorize(request);