You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2014/12/08 21:29:26 UTC

[03/51] [partial] incubator-nifi git commit: Initial code contribution

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
new file mode 100644
index 0000000..c67181a
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
+import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.DomUtils;
+import org.apache.nifi.web.api.dto.ConnectableDTO;
+import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.FunnelDTO;
+import org.apache.nifi.web.api.dto.LabelDTO;
+import org.apache.nifi.web.api.dto.PortDTO;
+import org.apache.nifi.web.api.dto.PositionDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
+import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
+
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+public class FlowFromDOMFactory {
+
+    public static PositionDTO getPosition(final Element positionElement) {
+        if (positionElement == null) {
+            throw new IllegalArgumentException("Invalid Flow: Found no 'position' element");
+        }
+        return new PositionDTO(Double.parseDouble(positionElement.getAttribute("x")), Double.parseDouble(positionElement.getAttribute("y")));
+    }
+
+    public static Size getSize(final Element sizeElement) {
+        if (sizeElement == null) {
+            throw new IllegalArgumentException("Invalid Flow: Found no 'size' element");
+        }
+
+        return new Size(Double.parseDouble(sizeElement.getAttribute("width")), Double.parseDouble(sizeElement.getAttribute("height")));
+    }
+
+    public static Map<String, String> getStyle(final Element stylesElement) {
+        final Map<String, String> styles = new HashMap<>();
+        if (stylesElement == null) {
+            return styles;
+        }
+
+        for (final Element styleElement : getChildrenByTagName(stylesElement, "style")) {
+            final String styleName = styleElement.getAttribute("name");
+            final String styleValue = styleElement.getTextContent();
+            styles.put(styleName, styleValue);
+        }
+
+        return styles;
+    }
+
+    public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) {
+        final ProcessGroupDTO dto = new ProcessGroupDTO();
+        final String groupId = getString(element, "id");
+        dto.setId(groupId);
+        dto.setParentGroupId(parentId);
+        dto.setName(getString(element, "name"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        dto.setComments(getString(element, "comment"));
+
+        final Set<ProcessorDTO> processors = new HashSet<>();
+        final Set<ConnectionDTO> connections = new HashSet<>();
+        final Set<FunnelDTO> funnels = new HashSet<>();
+        final Set<PortDTO> inputPorts = new HashSet<>();
+        final Set<PortDTO> outputPorts = new HashSet<>();
+        final Set<LabelDTO> labels = new HashSet<>();
+        final Set<ProcessGroupDTO> processGroups = new HashSet<>();
+        final Set<RemoteProcessGroupDTO> remoteProcessGroups = new HashSet<>();
+
+        final FlowSnippetDTO groupContents = new FlowSnippetDTO();
+        groupContents.setConnections(connections);
+        groupContents.setFunnels(funnels);
+        groupContents.setInputPorts(inputPorts);
+        groupContents.setLabels(labels);
+        groupContents.setOutputPorts(outputPorts);
+        groupContents.setProcessGroups(processGroups);
+        groupContents.setProcessors(processors);
+        groupContents.setRemoteProcessGroups(remoteProcessGroups);
+
+        NodeList nodeList = DomUtils.getChildNodesByTagName(element, "processor");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            processors.add(getProcessor((Element) nodeList.item(i), encryptor));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "funnel");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            funnels.add(getFunnel((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "inputPort");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            inputPorts.add(getPort((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "outputPort");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            outputPorts.add(getPort((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "label");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            labels.add(getLabel((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "processGroup");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "remoteProcessGroup");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            remoteProcessGroups.add(getRemoteProcessGroup((Element) nodeList.item(i)));
+        }
+
+        nodeList = DomUtils.getChildNodesByTagName(element, "connection");
+        for (int i = 0; i < nodeList.getLength(); i++) {
+            connections.add(getConnection((Element) nodeList.item(i)));
+        }
+
+        dto.setContents(groupContents);
+        return dto;
+    }
+
+    public static ConnectionDTO getConnection(final Element element) {
+        final ConnectionDTO dto = new ConnectionDTO();
+        dto.setId(getString(element, "id"));
+        dto.setName(getString(element, "name"));
+        dto.setLabelIndex(getOptionalInt(element, "labelIndex"));
+        dto.setzIndex(getOptionalLong(element, "zIndex"));
+
+        final List<PositionDTO> bends = new ArrayList<>();
+        final Element bendPointsElement = DomUtils.getChild(element, "bendPoints");
+        if (bendPointsElement != null) {
+            for (final Element bendPointElement : getChildrenByTagName(bendPointsElement, "bendPoint")) {
+                final PositionDTO bend = getPosition(bendPointElement);
+                bends.add(bend);
+            }
+        }
+        dto.setBends(bends);
+
+        final ConnectableDTO sourceConnectable = new ConnectableDTO();
+        dto.setSource(sourceConnectable);
+        sourceConnectable.setId(getString(element, "sourceId"));
+        sourceConnectable.setGroupId(getString(element, "sourceGroupId"));
+        sourceConnectable.setType(getString(element, "sourceType"));
+
+        final ConnectableDTO destConnectable = new ConnectableDTO();
+        dto.setDestination(destConnectable);
+        destConnectable.setId(getString(element, "destinationId"));
+        destConnectable.setGroupId(getString(element, "destinationGroupId"));
+        destConnectable.setType(getString(element, "destinationType"));
+
+        final Set<String> relationships = new HashSet<>();
+        final List<Element> relationshipNodeList = getChildrenByTagName(element, "relationship");
+        for (final Element relationshipElem : relationshipNodeList) {
+            relationships.add(relationshipElem.getTextContent());
+        }
+        dto.setSelectedRelationships(relationships);
+
+        dto.setBackPressureObjectThreshold(getLong(element, "maxWorkQueueSize"));
+
+        final String maxDataSize = getString(element, "maxWorkQueueDataSize");
+        if (maxDataSize != null && !maxDataSize.trim().isEmpty()) {
+            dto.setBackPressureDataSizeThreshold(maxDataSize);
+        }
+
+        String expiration = getString(element, "flowFileExpiration");
+        if (expiration == null) {
+            expiration = "0 sec";
+        }
+        dto.setFlowFileExpiration(expiration);
+
+        final List<String> prioritizerClasses = new ArrayList<>();
+        final List<Element> prioritizerNodeList = getChildrenByTagName(element, "queuePrioritizerClass");
+        for (final Element prioritizerElement : prioritizerNodeList) {
+            prioritizerClasses.add(prioritizerElement.getTextContent().trim());
+        }
+        dto.setPrioritizers(prioritizerClasses);
+
+        return dto;
+    }
+
+    public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element) {
+        final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
+        dto.setId(getString(element, "id"));
+        dto.setName(getString(element, "name"));
+        dto.setTargetUri(getString(element, "url"));
+        dto.setTransmitting(getBoolean(element, "transmitting"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        dto.setCommunicationsTimeout(getString(element, "timeout"));
+        dto.setComments(getString(element, "comment"));
+
+        return dto;
+    }
+
+    public static LabelDTO getLabel(final Element element) {
+        final LabelDTO dto = new LabelDTO();
+        dto.setId(getString(element, "id"));
+        dto.setLabel(getString(element, "value"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        final Size size = getSize(DomUtils.getChild(element, "size"));
+        dto.setWidth(size.getWidth());
+        dto.setHeight(size.getHeight());
+        dto.setStyle(getStyle(DomUtils.getChild(element, "styles")));
+
+        return dto;
+    }
+
+    public static FunnelDTO getFunnel(final Element element) {
+        final FunnelDTO dto = new FunnelDTO();
+        dto.setId(getString(element, "id"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+
+        return dto;
+    }
+
+    public static PortDTO getPort(final Element element) {
+        final PortDTO portDTO = new PortDTO();
+        portDTO.setId(getString(element, "id"));
+        portDTO.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        portDTO.setName(getString(element, "name"));
+        portDTO.setComments(getString(element, "comments"));
+        final ScheduledState scheduledState = getScheduledState(element);
+        portDTO.setState(scheduledState.toString());
+
+        final List<Element> maxTasksElements = getChildrenByTagName(element, "maxConcurrentTasks");
+        if (!maxTasksElements.isEmpty()) {
+            portDTO.setConcurrentlySchedulableTaskCount(Integer.parseInt(maxTasksElements.get(0).getTextContent()));
+        }
+
+        final List<Element> userAccessControls = getChildrenByTagName(element, "userAccessControl");
+        if (userAccessControls != null && !userAccessControls.isEmpty()) {
+            final Set<String> users = new HashSet<>();
+            portDTO.setUserAccessControl(users);
+            for (final Element userElement : userAccessControls) {
+                users.add(userElement.getTextContent());
+            }
+        }
+
+        final List<Element> groupAccessControls = getChildrenByTagName(element, "groupAccessControl");
+        if (groupAccessControls != null && !groupAccessControls.isEmpty()) {
+            final Set<String> groups = new HashSet<>();
+            portDTO.setGroupAccessControl(groups);
+            for (final Element groupElement : groupAccessControls) {
+                groups.add(groupElement.getTextContent());
+            }
+        }
+
+        return portDTO;
+    }
+
+    public static RemoteProcessGroupPortDescriptor getRemoteProcessGroupPort(final Element element) {
+        final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
+
+        // What we have serialized is the ID of the Remote Process Group, followed by a dash ('-'), followed by
+        // the actual ID of the port; we want to get rid of the remote process group id.
+        String id = getString(element, "id");
+        if (id.length() > 37) {
+            id = id.substring(37);
+        }
+
+        descriptor.setId(id);
+        descriptor.setName(getString(element, "name"));
+        descriptor.setComments(getString(element, "comments"));
+        descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
+        descriptor.setUseCompression(getBoolean(element, "useCompression"));
+        descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState")));
+
+        return descriptor;
+    }
+
+    public static ProcessorDTO getProcessor(final Element element, final StringEncryptor encryptor) {
+        final ProcessorDTO dto = new ProcessorDTO();
+
+        dto.setId(getString(element, "id"));
+        dto.setName(getString(element, "name"));
+        dto.setType(getString(element, "class"));
+        dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
+        dto.setStyle(getStyle(DomUtils.getChild(element, "styles")));
+
+        final ProcessorConfigDTO configDto = new ProcessorConfigDTO();
+        dto.setConfig(configDto);
+        configDto.setComments(getString(element, "comment"));
+        configDto.setAnnotationData(getString(element, "annotationData"));
+        configDto.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
+        final String schedulingPeriod = getString(element, "schedulingPeriod");
+        configDto.setSchedulingPeriod(schedulingPeriod);
+        configDto.setPenaltyDuration(getString(element, "penalizationPeriod"));
+        configDto.setYieldDuration(getString(element, "yieldPeriod"));
+        configDto.setBulletinLevel(getString(element, "bulletinLevel"));
+        configDto.setLossTolerant(getBoolean(element, "lossTolerant"));
+        final ScheduledState scheduledState = getScheduledState(element);
+        dto.setState(scheduledState.toString());
+
+        // handle scheduling strategy
+        final String schedulingStrategyName = getString(element, "schedulingStrategy");
+        if (schedulingStrategyName == null || schedulingStrategyName.trim().isEmpty()) {
+            configDto.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN.name());
+        } else {
+            configDto.setSchedulingStrategy(schedulingStrategyName.trim());
+        }
+
+        final Long runDurationNanos = getOptionalLong(element, "runDurationNanos");
+        if (runDurationNanos != null) {
+            configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos));
+        }
+
+        final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
+        final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
+        for (final Element propertyElement : propertyNodeList) {
+            final String name = getString(propertyElement, "name");
+            final String value = decrypt(getString(propertyElement, "value"), encryptor);
+            properties.put(name, value);
+        }
+        configDto.setProperties(properties);
+
+        final Set<String> autoTerminatedRelationships = new HashSet<>();
+        final List<Element> autoTerminateList = getChildrenByTagName(element, "autoTerminatedRelationship");
+        for (final Element autoTerminateElement : autoTerminateList) {
+            autoTerminatedRelationships.add(autoTerminateElement.getTextContent());
+        }
+        configDto.setAutoTerminatedRelationships(autoTerminatedRelationships);
+
+        return dto;
+    }
+
+    private static String getString(final Element element, final String childElementName) {
+        final List<Element> nodeList = getChildrenByTagName(element, childElementName);
+        if (nodeList == null || nodeList.isEmpty()) {
+            return null;
+        }
+        final Element childElement = nodeList.get(0);
+        return childElement.getTextContent();
+    }
+
+    private static Integer getOptionalInt(final Element element, final String childElementName) {
+        final List<Element> nodeList = getChildrenByTagName(element, childElementName);
+        if (nodeList == null || nodeList.isEmpty()) {
+            return null;
+        }
+        final Element childElement = nodeList.get(0);
+        final String val = childElement.getTextContent();
+        if (val == null) {
+            return null;
+        }
+        return Integer.parseInt(val);
+    }
+
+    private static Long getOptionalLong(final Element element, final String childElementName) {
+        final List<Element> nodeList = getChildrenByTagName(element, childElementName);
+        if (nodeList == null || nodeList.isEmpty()) {
+            return null;
+        }
+        final Element childElement = nodeList.get(0);
+        final String val = childElement.getTextContent();
+        if (val == null) {
+            return null;
+        }
+        return Long.parseLong(val);
+    }
+
+    private static int getInt(final Element element, final String childElementName) {
+        return Integer.parseInt(getString(element, childElementName));
+    }
+
+    private static long getLong(final Element element, final String childElementName) {
+        return Long.parseLong(getString(element, childElementName));
+    }
+
+    private static boolean getBoolean(final Element element, final String childElementName) {
+        return Boolean.parseBoolean(getString(element, childElementName));
+    }
+
+    private static ScheduledState getScheduledState(final Element element) {
+        return ScheduledState.valueOf(getString(element, "scheduledState"));
+    }
+
+    private static List<Element> getChildrenByTagName(final Element element, final String childElementName) {
+        return DomUtils.getChildElementsByTagName(element, childElementName);
+    }
+
+    private static String decrypt(final String value, final StringEncryptor encryptor) {
+        if (value != null && value.startsWith(FlowSerializer.ENC_PREFIX) && value.endsWith(FlowSerializer.ENC_SUFFIX)) {
+            return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length()));
+        } else {
+            return value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java
new file mode 100644
index 0000000..f1ee760
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+/**
+ * Represents the exceptional case when flow configuration is malformed and
+ * therefore, cannot be serialized or deserialized.
+ *
+ * @author unattributed
+ */
+public class FlowSerializationException extends RuntimeException {
+
+    private static final long serialVersionUID = 128934798237L;
+
+    public FlowSerializationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
+    public FlowSerializationException(Throwable cause) {
+        super(cause);
+    }
+
+    public FlowSerializationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FlowSerializationException(String message) {
+        super(message);
+    }
+
+    public FlowSerializationException() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java
new file mode 100644
index 0000000..331b26c
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.io.OutputStream;
+
+/**
+ * Serializes the flow configuration of a controller instance to an output
+ * stream.
+ *
+ * @author unattributed
+ */
+public interface FlowSerializer {
+
+    public static final String ENC_PREFIX = "enc{";
+    public static final String ENC_SUFFIX = "}";
+
+    /**
+     * Serializes the flow configuration of a controller instance.
+     *
+     * @param controller a controller
+     * @param os an output stream to write the configuration to
+     *
+     * @throws FlowSerializationException if serialization failed
+     */
+    void serialize(FlowController controller, OutputStream os) throws FlowSerializationException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
new file mode 100644
index 0000000..706ac46
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+/**
+ * Represents the exceptional case when a controller managing an existing flow
+ * fails to fully load a different flow.
+ *
+ * @author unattributed
+ */
+public class FlowSynchronizationException extends RuntimeException {
+
+    private static final long serialVersionUID = 109234802938L;
+
+    public FlowSynchronizationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+
+    public FlowSynchronizationException(Throwable cause) {
+        super(cause);
+    }
+
+    public FlowSynchronizationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public FlowSynchronizationException(String message) {
+        super(message);
+    }
+
+    public FlowSynchronizationException() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java
new file mode 100644
index 0000000..f6889fe
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.cluster.protocol.DataFlow;
+import org.apache.nifi.encrypt.StringEncryptor;
+
+/**
+ * @author unattributed
+ */
+public interface FlowSynchronizer {
+
+    /**
+     * Synchronizes the given controller with the given flow configuration. If
+     * loading the proposed flow configuration would cause the controller to
+     * orphan flow files, then an UninheritableFlowException is thrown.
+     *
+     * If the FlowSynchronizationException is thrown, then the controller may
+     * have changed some of its state and should no longer be used.
+     *
+     * @param controller the flow controller
+     * @param dataFlow the flow to load the controller with. If the flow is null
+     * or zero length, then the controller must not have a flow or else an
+     * UninheritableFlowException will be thrown.
+     * @param encryptor used for the encryption/decryption of sensitive property
+     * values
+     *
+     * @throws FlowSerializationException if proposed flow is not a valid flow
+     * configuration file
+     * @throws UninheritableFlowException if the proposed flow cannot be loaded
+     * by the controller because in doing so would risk orphaning flow files
+     * @throws FlowSynchronizationException if updates to the controller failed.
+     * If this exception is thrown, then the controller should be considered
+     * unsafe to be used
+     */
+    void sync(FlowController controller, DataFlow dataFlow, StringEncryptor encryptor)
+            throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
new file mode 100644
index 0000000..fa33b49
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.io.ByteArrayInputStream;
+import org.apache.nifi.web.api.dto.FlowSnippetDTO;
+import org.apache.nifi.web.api.dto.ProcessGroupDTO;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+public class FlowUnmarshaller {
+
+    /**
+     * Interprets the given byte array as an XML document that conforms to the
+     * Flow Configuration schema and returns a FlowSnippetDTO representing the
+     * flow
+     *
+     * @param flowContents
+     * @param encryptor
+     * @return
+     * @throws NullPointerException if <code>flowContents</code> is null
+     * @throws IOException
+     * @throws SAXException
+     * @throws ParserConfigurationException
+     */
+    public static FlowSnippetDTO unmarshal(final byte[] flowContents, final StringEncryptor encryptor) throws IOException, SAXException, ParserConfigurationException {
+        if (Objects.requireNonNull(flowContents).length == 0) {
+            return new FlowSnippetDTO();
+        }
+
+        final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+        final DocumentBuilder docBuilder = dbf.newDocumentBuilder();
+
+        final Document document = docBuilder.parse(new ByteArrayInputStream(flowContents));
+        final FlowSnippetDTO flowDto = new FlowSnippetDTO();
+
+        final NodeList nodeList = document.getElementsByTagName("rootGroup");
+        if (nodeList.getLength() == 0) {
+            return flowDto;
+        }
+        if (nodeList.getLength() > 1) {
+            throw new IllegalArgumentException("Contents contain multiple rootGroup elements");
+        }
+
+        final Set<ProcessGroupDTO> rootGroupSet = new HashSet<>();
+        flowDto.setProcessGroups(rootGroupSet);
+        rootGroupSet.add(FlowFromDOMFactory.getProcessGroup(null, (Element) nodeList.item(0), encryptor));
+
+        return flowDto;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java
new file mode 100644
index 0000000..415472f
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.nifi.io.ByteArrayInputStream;
+import org.apache.nifi.io.ByteArrayOutputStream;
+import org.apache.nifi.io.DataOutputStream;
+import org.apache.nifi.io.StreamUtils;
+import org.apache.nifi.persistence.StandardSnippetDeserializer;
+import org.apache.nifi.persistence.StandardSnippetSerializer;
+
+public class SnippetManager {
+
+    private final ConcurrentMap<String, StandardSnippet> snippetMap = new ConcurrentHashMap<>();
+
+    public void addSnippet(final StandardSnippet snippet) {
+        final StandardSnippet oldSnippet = this.snippetMap.putIfAbsent(snippet.getId(), snippet);
+        if (oldSnippet != null) {
+            throw new IllegalStateException("Snippet with ID " + snippet.getId() + " already exists");
+        }
+    }
+
+    public void removeSnippet(final StandardSnippet snippet) {
+        if (!snippetMap.remove(snippet.getId(), snippet)) {
+            throw new IllegalStateException("Snippet is not contained in this SnippetManager");
+        }
+    }
+
+    public StandardSnippet getSnippet(final String identifier) {
+        return snippetMap.get(identifier);
+    }
+
+    public Collection<StandardSnippet> getSnippets() {
+        return snippetMap.values();
+    }
+
+    public void clear() {
+        snippetMap.clear();
+    }
+
+    public static List<StandardSnippet> parseBytes(final byte[] bytes) {
+        final List<StandardSnippet> snippets = new ArrayList<>();
+
+        try (final InputStream rawIn = new ByteArrayInputStream(bytes);
+                final DataInputStream in = new DataInputStream(rawIn)) {
+            final int length = in.readInt();
+            final byte[] buffer = new byte[length];
+            StreamUtils.fillBuffer(in, buffer, true);
+            final StandardSnippet snippet = StandardSnippetDeserializer.deserialize(new ByteArrayInputStream(buffer));
+            snippets.add(snippet);
+        } catch (final IOException e) {
+            throw new RuntimeException("Failed to parse bytes", e);  // should never happen because of streams being used
+        }
+
+        return snippets;
+    }
+
+    public byte[] export() {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                final DataOutputStream dos = new DataOutputStream(baos)) {
+            for (final StandardSnippet snippet : getSnippets()) {
+                final byte[] bytes = StandardSnippetSerializer.serialize(snippet);
+                dos.writeInt(bytes.length);
+                dos.write(bytes);
+            }
+
+            return baos.toByteArray();
+        } catch (final IOException e) {
+            // won't happen
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java
new file mode 100644
index 0000000..2899a85
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import org.apache.nifi.controller.Counter;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class StandardCounter implements Counter {
+
+    private final String identifier;
+    private final String context;
+    private final String name;
+    private final AtomicLong value;
+
+    public StandardCounter(final String identifier, final String context, final String name) {
+        this.identifier = identifier;
+        this.context = context;
+        this.name = name;
+        this.value = new AtomicLong(0L);
+    }
+
+    public void adjust(final long delta) {
+        this.value.addAndGet(delta);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public long getValue() {
+        return this.value.get();
+    }
+
+    public String getContext() {
+        return context;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public void reset() {
+        this.value.set(0);
+    }
+
+    @Override
+    public String toString() {
+        return "Counter[identifier=" + identifier + ", context=" + context + ", name=" + name + ", value=" + value + ']';
+    }
+
+    public static UnmodifiableCounter unmodifiableCounter(final Counter counter) {
+        return new UnmodifiableCounter(counter);
+    }
+
+    static class UnmodifiableCounter extends StandardCounter {
+
+        private final Counter counter;
+
+        public UnmodifiableCounter(final Counter counter) {
+            super(counter.getIdentifier(), counter.getContext(), counter.getName());
+            this.counter = counter;
+        }
+
+        @Override
+        public void adjust(long delta) {
+            throw new UnsupportedOperationException("Cannot modify value of UnmodifiableCounter");
+        }
+
+        @Override
+        public String getName() {
+            return counter.getName();
+        }
+
+        @Override
+        public long getValue() {
+            return counter.getValue();
+        }
+
+        @Override
+        public String getContext() {
+            return counter.getContext();
+        }
+
+        @Override
+        public String getIdentifier() {
+            return counter.getIdentifier();
+        }
+
+        @Override
+        public String toString() {
+            return counter.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
new file mode 100644
index 0000000..e08a94d
--- /dev/null
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+import java.io.BufferedOutputStream;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.OutputKeys;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.TransformerFactoryConfigurationError;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.connectable.ConnectableType;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.connectable.Funnel;
+import org.apache.nifi.connectable.Port;
+import org.apache.nifi.connectable.Position;
+import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.remote.RootGroupPort;
+
+import org.w3c.dom.DOMException;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+/**
+ * Serializes a Flow Controller as XML to an output stream.
+ *
+ * NOT THREAD-SAFE.
+ */
+public class StandardFlowSerializer implements FlowSerializer {
+
+    private final StringEncryptor encryptor;
+
+    public StandardFlowSerializer(final StringEncryptor encryptor) {
+        this.encryptor = encryptor;
+    }
+
+    @Override
+    public void serialize(final FlowController controller, final OutputStream os) throws FlowSerializationException {
+        try {
+            // create a new, empty document
+            final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
+            final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
+            final Document doc = docBuilder.newDocument();
+
+            // populate document with controller state
+            final Element rootNode = doc.createElement("flowController");
+            doc.appendChild(rootNode);
+            addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
+            addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
+            addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup");
+
+            final DOMSource domSource = new DOMSource(doc);
+            final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os));
+
+            // configure the transformer and convert the DOM
+            final TransformerFactory transformFactory = TransformerFactory.newInstance();
+            final Transformer transformer = transformFactory.newTransformer();
+            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
+            transformer.setOutputProperty(OutputKeys.INDENT, "yes");
+
+            // transform the document to byte stream
+            transformer.transform(domSource, streamResult);
+
+        } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
+            throw new FlowSerializationException(e);
+        }
+    }
+
+    private void addSize(final Element parentElement, final Size size) {
+        final Element element = parentElement.getOwnerDocument().createElement("size");
+        element.setAttribute("width", String.valueOf(size.getWidth()));
+        element.setAttribute("height", String.valueOf(size.getHeight()));
+        parentElement.appendChild(element);
+    }
+
+    private void addPosition(final Element parentElement, final Position position) {
+        addPosition(parentElement, position, "position");
+    }
+
+    private void addPosition(final Element parentElement, final Position position, final String elementName) {
+        final Element element = parentElement.getOwnerDocument().createElement(elementName);
+        element.setAttribute("x", String.valueOf(position.getX()));
+        element.setAttribute("y", String.valueOf(position.getY()));
+        parentElement.appendChild(element);
+    }
+
+    private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement(elementName);
+        parentElement.appendChild(element);
+        addTextElement(element, "id", group.getIdentifier());
+        addTextElement(element, "name", group.getName());
+        addPosition(element, group.getPosition());
+        addTextElement(element, "comment", group.getComments());
+
+        for (final ProcessorNode processor : group.getProcessors()) {
+            addProcessor(element, processor);
+        }
+
+        if (group.isRootGroup()) {
+            for (final Port port : group.getInputPorts()) {
+                addRootGroupPort(element, (RootGroupPort) port, "inputPort");
+            }
+
+            for (final Port port : group.getOutputPorts()) {
+                addRootGroupPort(element, (RootGroupPort) port, "outputPort");
+            }
+        } else {
+            for (final Port port : group.getInputPorts()) {
+                addPort(element, port, "inputPort");
+            }
+
+            for (final Port port : group.getOutputPorts()) {
+                addPort(element, port, "outputPort");
+            }
+        }
+
+        for (final Label label : group.getLabels()) {
+            addLabel(element, label);
+        }
+
+        for (final Funnel funnel : group.getFunnels()) {
+            addFunnel(element, funnel);
+        }
+
+        for (final ProcessGroup childGroup : group.getProcessGroups()) {
+            addProcessGroup(element, childGroup, "processGroup");
+        }
+
+        for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
+            addRemoteProcessGroup(element, remoteRef);
+        }
+
+        for (final Connection connection : group.getConnections()) {
+            addConnection(element, connection);
+        }
+    }
+
+    private void addStyle(final Element parentElement, final Map<String, String> style) {
+        final Element element = parentElement.getOwnerDocument().createElement("styles");
+
+        for (final Map.Entry<String, String> entry : style.entrySet()) {
+            final Element styleElement = parentElement.getOwnerDocument().createElement("style");
+            styleElement.setAttribute("name", entry.getKey());
+            styleElement.setTextContent(entry.getValue());
+            element.appendChild(styleElement);
+        }
+
+        parentElement.appendChild(element);
+    }
+
+    private void addLabel(final Element parentElement, final Label label) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement("label");
+        parentElement.appendChild(element);
+        addTextElement(element, "id", label.getIdentifier());
+
+        addPosition(element, label.getPosition());
+        addSize(element, label.getSize());
+        addStyle(element, label.getStyle());
+
+        addTextElement(element, "value", label.getValue());
+        parentElement.appendChild(element);
+    }
+
+    private void addFunnel(final Element parentElement, final Funnel funnel) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement("funnel");
+        parentElement.appendChild(element);
+        addTextElement(element, "id", funnel.getIdentifier());
+        addPosition(element, funnel.getPosition());
+    }
+
+    private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement("remoteProcessGroup");
+        parentElement.appendChild(element);
+        addTextElement(element, "id", remoteRef.getIdentifier());
+        addTextElement(element, "name", remoteRef.getName());
+        addPosition(element, remoteRef.getPosition());
+        addTextElement(element, "comment", remoteRef.getComments());
+        addTextElement(element, "url", remoteRef.getTargetUri().toString());
+        addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout());
+        addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration());
+        addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting()));
+
+        for (final RemoteGroupPort port : remoteRef.getInputPorts()) {
+            if (port.hasIncomingConnection()) {
+                addRemoteGroupPort(element, port, "inputPort");
+            }
+        }
+
+        for (final RemoteGroupPort port : remoteRef.getOutputPorts()) {
+            if (!port.getConnections().isEmpty()) {
+                addRemoteGroupPort(element, port, "outputPort");
+            }
+        }
+
+        parentElement.appendChild(element);
+    }
+
+    private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement(elementName);
+        parentElement.appendChild(element);
+        addTextElement(element, "id", port.getIdentifier());
+        addTextElement(element, "name", port.getName());
+        addPosition(element, port.getPosition());
+        addTextElement(element, "comments", port.getComments());
+        addTextElement(element, "scheduledState", port.getScheduledState().name());
+        addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
+        addTextElement(element, "useCompression", String.valueOf(((RemoteGroupPort) port).isUseCompression()));
+
+        parentElement.appendChild(element);
+    }
+
+    private void addPort(final Element parentElement, final Port port, final String elementName) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement(elementName);
+        parentElement.appendChild(element);
+        addTextElement(element, "id", port.getIdentifier());
+        addTextElement(element, "name", port.getName());
+        addPosition(element, port.getPosition());
+        addTextElement(element, "comments", port.getComments());
+        addTextElement(element, "scheduledState", port.getScheduledState().name());
+
+        parentElement.appendChild(element);
+    }
+
+    private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement(elementName);
+        parentElement.appendChild(element);
+        addTextElement(element, "id", port.getIdentifier());
+        addTextElement(element, "name", port.getName());
+        addPosition(element, port.getPosition());
+        addTextElement(element, "comments", port.getComments());
+        addTextElement(element, "scheduledState", port.getScheduledState().name());
+        addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks()));
+        for (final String user : port.getUserAccessControl()) {
+            addTextElement(element, "userAccessControl", user);
+        }
+        for (final String group : port.getGroupAccessControl()) {
+            addTextElement(element, "groupAccessControl", group);
+        }
+
+        parentElement.appendChild(element);
+    }
+
+    private void addProcessor(final Element parentElement, final ProcessorNode processor) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement("processor");
+        parentElement.appendChild(element);
+        addTextElement(element, "id", processor.getIdentifier());
+        addTextElement(element, "name", processor.getName());
+
+        addPosition(element, processor.getPosition());
+        addStyle(element, processor.getStyle());
+
+        addTextElement(element, "comment", processor.getComments());
+        addTextElement(element, "class", processor.getProcessor().getClass().getCanonicalName());
+        addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks());
+        addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod());
+        addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod());
+        addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
+        addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
+        addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
+        addTextElement(element, "scheduledState", processor.getScheduledState().name());
+        addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
+        addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
+
+        // properties.
+        for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) {
+            final PropertyDescriptor descriptor = entry.getKey();
+            String value = entry.getValue();
+
+            if (value != null && descriptor.isSensitive()) {
+                value = ENC_PREFIX + encryptor.encrypt(value) + ENC_SUFFIX;
+            }
+
+            if (value == null) {
+                value = descriptor.getDefaultValue();
+            }
+
+            final Element propElement = doc.createElement("property");
+            addTextElement(propElement, "name", descriptor.getName());
+            if (value != null) {
+                addTextElement(propElement, "value", value);
+            }
+
+            element.appendChild(propElement);
+        }
+
+        final String annotationData = processor.getAnnotationData();
+        if (annotationData != null) {
+            addTextElement(element, "annotationData", annotationData);
+        }
+
+        for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
+            addTextElement(element, "autoTerminatedRelationship", rel.getName());
+        }
+    }
+
+    private void addConnection(final Element parentElement, final Connection connection) {
+        final Document doc = parentElement.getOwnerDocument();
+        final Element element = doc.createElement("connection");
+        parentElement.appendChild(element);
+        addTextElement(element, "id", connection.getIdentifier());
+        addTextElement(element, "name", connection.getName());
+
+        final Element bendPointsElement = doc.createElement("bendPoints");
+        element.appendChild(bendPointsElement);
+        for (final Position bendPoint : connection.getBendPoints()) {
+            addPosition(bendPointsElement, bendPoint, "bendPoint");
+        }
+
+        addTextElement(element, "labelIndex", connection.getLabelIndex());
+        addTextElement(element, "zIndex", connection.getZIndex());
+
+        final String sourceId = connection.getSource().getIdentifier();
+        final ConnectableType sourceType = connection.getSource().getConnectableType();
+        final String sourceGroupId;
+        if (sourceType == ConnectableType.REMOTE_OUTPUT_PORT) {
+            sourceGroupId = ((RemoteGroupPort) connection.getSource()).getRemoteProcessGroup().getIdentifier();
+        } else {
+            sourceGroupId = connection.getSource().getProcessGroup().getIdentifier();
+        }
+
+        final ConnectableType destinationType = connection.getDestination().getConnectableType();
+        final String destinationId = connection.getDestination().getIdentifier();
+        final String destinationGroupId;
+        if (destinationType == ConnectableType.REMOTE_INPUT_PORT) {
+            destinationGroupId = ((RemoteGroupPort) connection.getDestination()).getRemoteProcessGroup().getIdentifier();
+        } else {
+            destinationGroupId = connection.getDestination().getProcessGroup().getIdentifier();
+        }
+
+        addTextElement(element, "sourceId", sourceId);
+        addTextElement(element, "sourceGroupId", sourceGroupId);
+        addTextElement(element, "sourceType", sourceType.toString());
+
+        addTextElement(element, "destinationId", destinationId);
+        addTextElement(element, "destinationGroupId", destinationGroupId);
+        addTextElement(element, "destinationType", destinationType.toString());
+
+        for (final Relationship relationship : connection.getRelationships()) {
+            addTextElement(element, "relationship", relationship.getName());
+        }
+
+        addTextElement(element, "maxWorkQueueSize", connection.getFlowFileQueue().getBackPressureObjectThreshold());
+        addTextElement(element, "maxWorkQueueDataSize", connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
+
+        addTextElement(element, "flowFileExpiration", connection.getFlowFileQueue().getFlowFileExpiration());
+        for (final FlowFilePrioritizer comparator : connection.getFlowFileQueue().getPriorities()) {
+            final String className = comparator.getClass().getCanonicalName();
+            addTextElement(element, "queuePrioritizerClass", className);
+        }
+
+        parentElement.appendChild(element);
+    }
+
+    private void addTextElement(final Element element, final String name, final long value) {
+        addTextElement(element, name, String.valueOf(value));
+    }
+
+    private void addTextElement(final Element element, final String name, final String value) {
+        final Document doc = element.getOwnerDocument();
+        final Element toAdd = doc.createElement(name);
+        toAdd.setTextContent(value);
+        element.appendChild(toAdd);
+    }
+
+}