You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/01/21 07:48:14 UTC
[06/51] [partial] incubator-nifi git commit: NIFI-270 Made all
changes identified by adam, mark, joey to prep for a cleaner build
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
deleted file mode 100644
index c67181a..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
+++ /dev/null
@@ -1,418 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.connectable.Size;
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
-import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
-import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.util.DomUtils;
-import org.apache.nifi.web.api.dto.ConnectableDTO;
-import org.apache.nifi.web.api.dto.ConnectionDTO;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.FunnelDTO;
-import org.apache.nifi.web.api.dto.LabelDTO;
-import org.apache.nifi.web.api.dto.PortDTO;
-import org.apache.nifi.web.api.dto.PositionDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorDTO;
-import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-public class FlowFromDOMFactory {
-
- public static PositionDTO getPosition(final Element positionElement) {
- if (positionElement == null) {
- throw new IllegalArgumentException("Invalid Flow: Found no 'position' element");
- }
- return new PositionDTO(Double.parseDouble(positionElement.getAttribute("x")), Double.parseDouble(positionElement.getAttribute("y")));
- }
-
- public static Size getSize(final Element sizeElement) {
- if (sizeElement == null) {
- throw new IllegalArgumentException("Invalid Flow: Found no 'size' element");
- }
-
- return new Size(Double.parseDouble(sizeElement.getAttribute("width")), Double.parseDouble(sizeElement.getAttribute("height")));
- }
-
- public static Map<String, String> getStyle(final Element stylesElement) {
- final Map<String, String> styles = new HashMap<>();
- if (stylesElement == null) {
- return styles;
- }
-
- for (final Element styleElement : getChildrenByTagName(stylesElement, "style")) {
- final String styleName = styleElement.getAttribute("name");
- final String styleValue = styleElement.getTextContent();
- styles.put(styleName, styleValue);
- }
-
- return styles;
- }
-
- public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) {
- final ProcessGroupDTO dto = new ProcessGroupDTO();
- final String groupId = getString(element, "id");
- dto.setId(groupId);
- dto.setParentGroupId(parentId);
- dto.setName(getString(element, "name"));
- dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
- dto.setComments(getString(element, "comment"));
-
- final Set<ProcessorDTO> processors = new HashSet<>();
- final Set<ConnectionDTO> connections = new HashSet<>();
- final Set<FunnelDTO> funnels = new HashSet<>();
- final Set<PortDTO> inputPorts = new HashSet<>();
- final Set<PortDTO> outputPorts = new HashSet<>();
- final Set<LabelDTO> labels = new HashSet<>();
- final Set<ProcessGroupDTO> processGroups = new HashSet<>();
- final Set<RemoteProcessGroupDTO> remoteProcessGroups = new HashSet<>();
-
- final FlowSnippetDTO groupContents = new FlowSnippetDTO();
- groupContents.setConnections(connections);
- groupContents.setFunnels(funnels);
- groupContents.setInputPorts(inputPorts);
- groupContents.setLabels(labels);
- groupContents.setOutputPorts(outputPorts);
- groupContents.setProcessGroups(processGroups);
- groupContents.setProcessors(processors);
- groupContents.setRemoteProcessGroups(remoteProcessGroups);
-
- NodeList nodeList = DomUtils.getChildNodesByTagName(element, "processor");
- for (int i = 0; i < nodeList.getLength(); i++) {
- processors.add(getProcessor((Element) nodeList.item(i), encryptor));
- }
-
- nodeList = DomUtils.getChildNodesByTagName(element, "funnel");
- for (int i = 0; i < nodeList.getLength(); i++) {
- funnels.add(getFunnel((Element) nodeList.item(i)));
- }
-
- nodeList = DomUtils.getChildNodesByTagName(element, "inputPort");
- for (int i = 0; i < nodeList.getLength(); i++) {
- inputPorts.add(getPort((Element) nodeList.item(i)));
- }
-
- nodeList = DomUtils.getChildNodesByTagName(element, "outputPort");
- for (int i = 0; i < nodeList.getLength(); i++) {
- outputPorts.add(getPort((Element) nodeList.item(i)));
- }
-
- nodeList = DomUtils.getChildNodesByTagName(element, "label");
- for (int i = 0; i < nodeList.getLength(); i++) {
- labels.add(getLabel((Element) nodeList.item(i)));
- }
-
- nodeList = DomUtils.getChildNodesByTagName(element, "processGroup");
- for (int i = 0; i < nodeList.getLength(); i++) {
- processGroups.add(getProcessGroup(groupId, (Element) nodeList.item(i), encryptor));
- }
-
- nodeList = DomUtils.getChildNodesByTagName(element, "remoteProcessGroup");
- for (int i = 0; i < nodeList.getLength(); i++) {
- remoteProcessGroups.add(getRemoteProcessGroup((Element) nodeList.item(i)));
- }
-
- nodeList = DomUtils.getChildNodesByTagName(element, "connection");
- for (int i = 0; i < nodeList.getLength(); i++) {
- connections.add(getConnection((Element) nodeList.item(i)));
- }
-
- dto.setContents(groupContents);
- return dto;
- }
-
- public static ConnectionDTO getConnection(final Element element) {
- final ConnectionDTO dto = new ConnectionDTO();
- dto.setId(getString(element, "id"));
- dto.setName(getString(element, "name"));
- dto.setLabelIndex(getOptionalInt(element, "labelIndex"));
- dto.setzIndex(getOptionalLong(element, "zIndex"));
-
- final List<PositionDTO> bends = new ArrayList<>();
- final Element bendPointsElement = DomUtils.getChild(element, "bendPoints");
- if (bendPointsElement != null) {
- for (final Element bendPointElement : getChildrenByTagName(bendPointsElement, "bendPoint")) {
- final PositionDTO bend = getPosition(bendPointElement);
- bends.add(bend);
- }
- }
- dto.setBends(bends);
-
- final ConnectableDTO sourceConnectable = new ConnectableDTO();
- dto.setSource(sourceConnectable);
- sourceConnectable.setId(getString(element, "sourceId"));
- sourceConnectable.setGroupId(getString(element, "sourceGroupId"));
- sourceConnectable.setType(getString(element, "sourceType"));
-
- final ConnectableDTO destConnectable = new ConnectableDTO();
- dto.setDestination(destConnectable);
- destConnectable.setId(getString(element, "destinationId"));
- destConnectable.setGroupId(getString(element, "destinationGroupId"));
- destConnectable.setType(getString(element, "destinationType"));
-
- final Set<String> relationships = new HashSet<>();
- final List<Element> relationshipNodeList = getChildrenByTagName(element, "relationship");
- for (final Element relationshipElem : relationshipNodeList) {
- relationships.add(relationshipElem.getTextContent());
- }
- dto.setSelectedRelationships(relationships);
-
- dto.setBackPressureObjectThreshold(getLong(element, "maxWorkQueueSize"));
-
- final String maxDataSize = getString(element, "maxWorkQueueDataSize");
- if (maxDataSize != null && !maxDataSize.trim().isEmpty()) {
- dto.setBackPressureDataSizeThreshold(maxDataSize);
- }
-
- String expiration = getString(element, "flowFileExpiration");
- if (expiration == null) {
- expiration = "0 sec";
- }
- dto.setFlowFileExpiration(expiration);
-
- final List<String> prioritizerClasses = new ArrayList<>();
- final List<Element> prioritizerNodeList = getChildrenByTagName(element, "queuePrioritizerClass");
- for (final Element prioritizerElement : prioritizerNodeList) {
- prioritizerClasses.add(prioritizerElement.getTextContent().trim());
- }
- dto.setPrioritizers(prioritizerClasses);
-
- return dto;
- }
-
- public static RemoteProcessGroupDTO getRemoteProcessGroup(final Element element) {
- final RemoteProcessGroupDTO dto = new RemoteProcessGroupDTO();
- dto.setId(getString(element, "id"));
- dto.setName(getString(element, "name"));
- dto.setTargetUri(getString(element, "url"));
- dto.setTransmitting(getBoolean(element, "transmitting"));
- dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
- dto.setCommunicationsTimeout(getString(element, "timeout"));
- dto.setComments(getString(element, "comment"));
-
- return dto;
- }
-
- public static LabelDTO getLabel(final Element element) {
- final LabelDTO dto = new LabelDTO();
- dto.setId(getString(element, "id"));
- dto.setLabel(getString(element, "value"));
- dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
- final Size size = getSize(DomUtils.getChild(element, "size"));
- dto.setWidth(size.getWidth());
- dto.setHeight(size.getHeight());
- dto.setStyle(getStyle(DomUtils.getChild(element, "styles")));
-
- return dto;
- }
-
- public static FunnelDTO getFunnel(final Element element) {
- final FunnelDTO dto = new FunnelDTO();
- dto.setId(getString(element, "id"));
- dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
-
- return dto;
- }
-
- public static PortDTO getPort(final Element element) {
- final PortDTO portDTO = new PortDTO();
- portDTO.setId(getString(element, "id"));
- portDTO.setPosition(getPosition(DomUtils.getChild(element, "position")));
- portDTO.setName(getString(element, "name"));
- portDTO.setComments(getString(element, "comments"));
- final ScheduledState scheduledState = getScheduledState(element);
- portDTO.setState(scheduledState.toString());
-
- final List<Element> maxTasksElements = getChildrenByTagName(element, "maxConcurrentTasks");
- if (!maxTasksElements.isEmpty()) {
- portDTO.setConcurrentlySchedulableTaskCount(Integer.parseInt(maxTasksElements.get(0).getTextContent()));
- }
-
- final List<Element> userAccessControls = getChildrenByTagName(element, "userAccessControl");
- if (userAccessControls != null && !userAccessControls.isEmpty()) {
- final Set<String> users = new HashSet<>();
- portDTO.setUserAccessControl(users);
- for (final Element userElement : userAccessControls) {
- users.add(userElement.getTextContent());
- }
- }
-
- final List<Element> groupAccessControls = getChildrenByTagName(element, "groupAccessControl");
- if (groupAccessControls != null && !groupAccessControls.isEmpty()) {
- final Set<String> groups = new HashSet<>();
- portDTO.setGroupAccessControl(groups);
- for (final Element groupElement : groupAccessControls) {
- groups.add(groupElement.getTextContent());
- }
- }
-
- return portDTO;
- }
-
- public static RemoteProcessGroupPortDescriptor getRemoteProcessGroupPort(final Element element) {
- final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
-
- // What we have serialized is the ID of the Remote Process Group, followed by a dash ('-'), followed by
- // the actual ID of the port; we want to get rid of the remote process group id.
- String id = getString(element, "id");
- if (id.length() > 37) {
- id = id.substring(37);
- }
-
- descriptor.setId(id);
- descriptor.setName(getString(element, "name"));
- descriptor.setComments(getString(element, "comments"));
- descriptor.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
- descriptor.setUseCompression(getBoolean(element, "useCompression"));
- descriptor.setTransmitting("RUNNING".equalsIgnoreCase(getString(element, "scheduledState")));
-
- return descriptor;
- }
-
- public static ProcessorDTO getProcessor(final Element element, final StringEncryptor encryptor) {
- final ProcessorDTO dto = new ProcessorDTO();
-
- dto.setId(getString(element, "id"));
- dto.setName(getString(element, "name"));
- dto.setType(getString(element, "class"));
- dto.setPosition(getPosition(DomUtils.getChild(element, "position")));
- dto.setStyle(getStyle(DomUtils.getChild(element, "styles")));
-
- final ProcessorConfigDTO configDto = new ProcessorConfigDTO();
- dto.setConfig(configDto);
- configDto.setComments(getString(element, "comment"));
- configDto.setAnnotationData(getString(element, "annotationData"));
- configDto.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
- final String schedulingPeriod = getString(element, "schedulingPeriod");
- configDto.setSchedulingPeriod(schedulingPeriod);
- configDto.setPenaltyDuration(getString(element, "penalizationPeriod"));
- configDto.setYieldDuration(getString(element, "yieldPeriod"));
- configDto.setBulletinLevel(getString(element, "bulletinLevel"));
- configDto.setLossTolerant(getBoolean(element, "lossTolerant"));
- final ScheduledState scheduledState = getScheduledState(element);
- dto.setState(scheduledState.toString());
-
- // handle scheduling strategy
- final String schedulingStrategyName = getString(element, "schedulingStrategy");
- if (schedulingStrategyName == null || schedulingStrategyName.trim().isEmpty()) {
- configDto.setSchedulingStrategy(SchedulingStrategy.TIMER_DRIVEN.name());
- } else {
- configDto.setSchedulingStrategy(schedulingStrategyName.trim());
- }
-
- final Long runDurationNanos = getOptionalLong(element, "runDurationNanos");
- if (runDurationNanos != null) {
- configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos));
- }
-
- final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
- final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
- for (final Element propertyElement : propertyNodeList) {
- final String name = getString(propertyElement, "name");
- final String value = decrypt(getString(propertyElement, "value"), encryptor);
- properties.put(name, value);
- }
- configDto.setProperties(properties);
-
- final Set<String> autoTerminatedRelationships = new HashSet<>();
- final List<Element> autoTerminateList = getChildrenByTagName(element, "autoTerminatedRelationship");
- for (final Element autoTerminateElement : autoTerminateList) {
- autoTerminatedRelationships.add(autoTerminateElement.getTextContent());
- }
- configDto.setAutoTerminatedRelationships(autoTerminatedRelationships);
-
- return dto;
- }
-
- private static String getString(final Element element, final String childElementName) {
- final List<Element> nodeList = getChildrenByTagName(element, childElementName);
- if (nodeList == null || nodeList.isEmpty()) {
- return null;
- }
- final Element childElement = nodeList.get(0);
- return childElement.getTextContent();
- }
-
- private static Integer getOptionalInt(final Element element, final String childElementName) {
- final List<Element> nodeList = getChildrenByTagName(element, childElementName);
- if (nodeList == null || nodeList.isEmpty()) {
- return null;
- }
- final Element childElement = nodeList.get(0);
- final String val = childElement.getTextContent();
- if (val == null) {
- return null;
- }
- return Integer.parseInt(val);
- }
-
- private static Long getOptionalLong(final Element element, final String childElementName) {
- final List<Element> nodeList = getChildrenByTagName(element, childElementName);
- if (nodeList == null || nodeList.isEmpty()) {
- return null;
- }
- final Element childElement = nodeList.get(0);
- final String val = childElement.getTextContent();
- if (val == null) {
- return null;
- }
- return Long.parseLong(val);
- }
-
- private static int getInt(final Element element, final String childElementName) {
- return Integer.parseInt(getString(element, childElementName));
- }
-
- private static long getLong(final Element element, final String childElementName) {
- return Long.parseLong(getString(element, childElementName));
- }
-
- private static boolean getBoolean(final Element element, final String childElementName) {
- return Boolean.parseBoolean(getString(element, childElementName));
- }
-
- private static ScheduledState getScheduledState(final Element element) {
- return ScheduledState.valueOf(getString(element, "scheduledState"));
- }
-
- private static List<Element> getChildrenByTagName(final Element element, final String childElementName) {
- return DomUtils.getChildElementsByTagName(element, childElementName);
- }
-
- private static String decrypt(final String value, final StringEncryptor encryptor) {
- if (value != null && value.startsWith(FlowSerializer.ENC_PREFIX) && value.endsWith(FlowSerializer.ENC_SUFFIX)) {
- return encryptor.decrypt(value.substring(FlowSerializer.ENC_PREFIX.length(), value.length() - FlowSerializer.ENC_SUFFIX.length()));
- } else {
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java
deleted file mode 100644
index f1ee760..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializationException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-/**
- * Represents the exceptional case when flow configuration is malformed and
- * therefore, cannot be serialized or deserialized.
- *
- * @author unattributed
- */
-public class FlowSerializationException extends RuntimeException {
-
- private static final long serialVersionUID = 128934798237L;
-
- public FlowSerializationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
- public FlowSerializationException(Throwable cause) {
- super(cause);
- }
-
- public FlowSerializationException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public FlowSerializationException(String message) {
- super(message);
- }
-
- public FlowSerializationException() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java
deleted file mode 100644
index 331b26c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.OutputStream;
-
-/**
- * Serializes the flow configuration of a controller instance to an output
- * stream.
- *
- * @author unattributed
- */
-public interface FlowSerializer {
-
- public static final String ENC_PREFIX = "enc{";
- public static final String ENC_SUFFIX = "}";
-
- /**
- * Serializes the flow configuration of a controller instance.
- *
- * @param controller a controller
- * @param os an output stream to write the configuration to
- *
- * @throws FlowSerializationException if serialization failed
- */
- void serialize(FlowController controller, OutputStream os) throws FlowSerializationException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
deleted file mode 100644
index 706ac46..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizationException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-/**
- * Represents the exceptional case when a controller managing an existing flow
- * fails to fully load a different flow.
- *
- * @author unattributed
- */
-public class FlowSynchronizationException extends RuntimeException {
-
- private static final long serialVersionUID = 109234802938L;
-
- public FlowSynchronizationException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-
- public FlowSynchronizationException(Throwable cause) {
- super(cause);
- }
-
- public FlowSynchronizationException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public FlowSynchronizationException(String message) {
- super(message);
- }
-
- public FlowSynchronizationException() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java
deleted file mode 100644
index f6889fe..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowSynchronizer.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import org.apache.nifi.cluster.protocol.DataFlow;
-import org.apache.nifi.encrypt.StringEncryptor;
-
-/**
- * @author unattributed
- */
-public interface FlowSynchronizer {
-
- /**
- * Synchronizes the given controller with the given flow configuration. If
- * loading the proposed flow configuration would cause the controller to
- * orphan flow files, then an UninheritableFlowException is thrown.
- *
- * If the FlowSynchronizationException is thrown, then the controller may
- * have changed some of its state and should no longer be used.
- *
- * @param controller the flow controller
- * @param dataFlow the flow to load the controller with. If the flow is null
- * or zero length, then the controller must not have a flow or else an
- * UninheritableFlowException will be thrown.
- * @param encryptor used for the encryption/decryption of sensitive property
- * values
- *
- * @throws FlowSerializationException if proposed flow is not a valid flow
- * configuration file
- * @throws UninheritableFlowException if the proposed flow cannot be loaded
- * by the controller because in doing so would risk orphaning flow files
- * @throws FlowSynchronizationException if updates to the controller failed.
- * If this exception is thrown, then the controller should be considered
- * unsafe to be used
- */
- void sync(FlowController controller, DataFlow dataFlow, StringEncryptor encryptor)
- throws FlowSerializationException, UninheritableFlowException, FlowSynchronizationException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
deleted file mode 100644
index 42d7f1c..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowUnmarshaller.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Objects;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.web.api.dto.FlowSnippetDTO;
-import org.apache.nifi.web.api.dto.ProcessGroupDTO;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-
-public class FlowUnmarshaller {
-
- /**
- * Interprets the given byte array as an XML document that conforms to the
- * Flow Configuration schema and returns a FlowSnippetDTO representing the
- * flow
- *
- * @param flowContents
- * @param encryptor
- * @return
- * @throws NullPointerException if <code>flowContents</code> is null
- * @throws IOException
- * @throws SAXException
- * @throws ParserConfigurationException
- */
- public static FlowSnippetDTO unmarshal(final byte[] flowContents, final StringEncryptor encryptor) throws IOException, SAXException, ParserConfigurationException {
- if (Objects.requireNonNull(flowContents).length == 0) {
- return new FlowSnippetDTO();
- }
-
- final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
- final DocumentBuilder docBuilder = dbf.newDocumentBuilder();
-
- final Document document = docBuilder.parse(new ByteArrayInputStream(flowContents));
- final FlowSnippetDTO flowDto = new FlowSnippetDTO();
-
- final NodeList nodeList = document.getElementsByTagName("rootGroup");
- if (nodeList.getLength() == 0) {
- return flowDto;
- }
- if (nodeList.getLength() > 1) {
- throw new IllegalArgumentException("Contents contain multiple rootGroup elements");
- }
-
- final Set<ProcessGroupDTO> rootGroupSet = new HashSet<>();
- flowDto.setProcessGroups(rootGroupSet);
- rootGroupSet.add(FlowFromDOMFactory.getProcessGroup(null, (Element) nodeList.item(0), encryptor));
-
- return flowDto;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java
deleted file mode 100644
index 3a9662e..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/SnippetManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.nifi.stream.io.ByteArrayInputStream;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
-import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.persistence.StandardSnippetDeserializer;
-import org.apache.nifi.persistence.StandardSnippetSerializer;
-
-public class SnippetManager {
-
- private final ConcurrentMap<String, StandardSnippet> snippetMap = new ConcurrentHashMap<>();
-
- public void addSnippet(final StandardSnippet snippet) {
- final StandardSnippet oldSnippet = this.snippetMap.putIfAbsent(snippet.getId(), snippet);
- if (oldSnippet != null) {
- throw new IllegalStateException("Snippet with ID " + snippet.getId() + " already exists");
- }
- }
-
- public void removeSnippet(final StandardSnippet snippet) {
- if (!snippetMap.remove(snippet.getId(), snippet)) {
- throw new IllegalStateException("Snippet is not contained in this SnippetManager");
- }
- }
-
- public StandardSnippet getSnippet(final String identifier) {
- return snippetMap.get(identifier);
- }
-
- public Collection<StandardSnippet> getSnippets() {
- return snippetMap.values();
- }
-
- public void clear() {
- snippetMap.clear();
- }
-
- public static List<StandardSnippet> parseBytes(final byte[] bytes) {
- final List<StandardSnippet> snippets = new ArrayList<>();
-
- try (final InputStream rawIn = new ByteArrayInputStream(bytes);
- final DataInputStream in = new DataInputStream(rawIn)) {
- final int length = in.readInt();
- final byte[] buffer = new byte[length];
- StreamUtils.fillBuffer(in, buffer, true);
- final StandardSnippet snippet = StandardSnippetDeserializer.deserialize(new ByteArrayInputStream(buffer));
- snippets.add(snippet);
- } catch (final IOException e) {
- throw new RuntimeException("Failed to parse bytes", e); // should never happen because of streams being used
- }
-
- return snippets;
- }
-
- public byte[] export() {
- try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- final DataOutputStream dos = new DataOutputStream(baos)) {
- for (final StandardSnippet snippet : getSnippets()) {
- final byte[] bytes = StandardSnippetSerializer.serialize(snippet);
- dos.writeInt(bytes.length);
- dos.write(bytes);
- }
-
- return baos.toByteArray();
- } catch (final IOException e) {
- // won't happen
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java
deleted file mode 100644
index 2899a85..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardCounter.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import org.apache.nifi.controller.Counter;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class StandardCounter implements Counter {
-
- private final String identifier;
- private final String context;
- private final String name;
- private final AtomicLong value;
-
- public StandardCounter(final String identifier, final String context, final String name) {
- this.identifier = identifier;
- this.context = context;
- this.name = name;
- this.value = new AtomicLong(0L);
- }
-
- public void adjust(final long delta) {
- this.value.addAndGet(delta);
- }
-
- public String getName() {
- return name;
- }
-
- public long getValue() {
- return this.value.get();
- }
-
- public String getContext() {
- return context;
- }
-
- public String getIdentifier() {
- return identifier;
- }
-
- public void reset() {
- this.value.set(0);
- }
-
- @Override
- public String toString() {
- return "Counter[identifier=" + identifier + ", context=" + context + ", name=" + name + ", value=" + value + ']';
- }
-
- public static UnmodifiableCounter unmodifiableCounter(final Counter counter) {
- return new UnmodifiableCounter(counter);
- }
-
- static class UnmodifiableCounter extends StandardCounter {
-
- private final Counter counter;
-
- public UnmodifiableCounter(final Counter counter) {
- super(counter.getIdentifier(), counter.getContext(), counter.getName());
- this.counter = counter;
- }
-
- @Override
- public void adjust(long delta) {
- throw new UnsupportedOperationException("Cannot modify value of UnmodifiableCounter");
- }
-
- @Override
- public String getName() {
- return counter.getName();
- }
-
- @Override
- public long getValue() {
- return counter.getValue();
- }
-
- @Override
- public String getContext() {
- return counter.getContext();
- }
-
- @Override
- public String getIdentifier() {
- return counter.getIdentifier();
- }
-
- @Override
- public String toString() {
- return counter.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f6d9354b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
deleted file mode 100644
index e08a94d..0000000
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-import java.io.BufferedOutputStream;
-import java.io.OutputStream;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.OutputKeys;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.TransformerFactoryConfigurationError;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.connectable.ConnectableType;
-import org.apache.nifi.connectable.Connection;
-import org.apache.nifi.connectable.Funnel;
-import org.apache.nifi.connectable.Port;
-import org.apache.nifi.connectable.Position;
-import org.apache.nifi.connectable.Size;
-import org.apache.nifi.controller.label.Label;
-import org.apache.nifi.encrypt.StringEncryptor;
-import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroup;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.remote.RootGroupPort;
-
-import org.w3c.dom.DOMException;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-
-/**
- * Serializes a Flow Controller as XML to an output stream.
- *
- * NOT THREAD-SAFE.
- */
-public class StandardFlowSerializer implements FlowSerializer {
-
- private final StringEncryptor encryptor;
-
- public StandardFlowSerializer(final StringEncryptor encryptor) {
- this.encryptor = encryptor;
- }
-
- @Override
- public void serialize(final FlowController controller, final OutputStream os) throws FlowSerializationException {
- try {
- // create a new, empty document
- final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
- final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
- final Document doc = docBuilder.newDocument();
-
- // populate document with controller state
- final Element rootNode = doc.createElement("flowController");
- doc.appendChild(rootNode);
- addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
- addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
- addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup");
-
- final DOMSource domSource = new DOMSource(doc);
- final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os));
-
- // configure the transformer and convert the DOM
- final TransformerFactory transformFactory = TransformerFactory.newInstance();
- final Transformer transformer = transformFactory.newTransformer();
- transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
- transformer.setOutputProperty(OutputKeys.INDENT, "yes");
-
- // transform the document to byte stream
- transformer.transform(domSource, streamResult);
-
- } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
- throw new FlowSerializationException(e);
- }
- }
-
- private void addSize(final Element parentElement, final Size size) {
- final Element element = parentElement.getOwnerDocument().createElement("size");
- element.setAttribute("width", String.valueOf(size.getWidth()));
- element.setAttribute("height", String.valueOf(size.getHeight()));
- parentElement.appendChild(element);
- }
-
- private void addPosition(final Element parentElement, final Position position) {
- addPosition(parentElement, position, "position");
- }
-
- private void addPosition(final Element parentElement, final Position position, final String elementName) {
- final Element element = parentElement.getOwnerDocument().createElement(elementName);
- element.setAttribute("x", String.valueOf(position.getX()));
- element.setAttribute("y", String.valueOf(position.getY()));
- parentElement.appendChild(element);
- }
-
- private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement(elementName);
- parentElement.appendChild(element);
- addTextElement(element, "id", group.getIdentifier());
- addTextElement(element, "name", group.getName());
- addPosition(element, group.getPosition());
- addTextElement(element, "comment", group.getComments());
-
- for (final ProcessorNode processor : group.getProcessors()) {
- addProcessor(element, processor);
- }
-
- if (group.isRootGroup()) {
- for (final Port port : group.getInputPorts()) {
- addRootGroupPort(element, (RootGroupPort) port, "inputPort");
- }
-
- for (final Port port : group.getOutputPorts()) {
- addRootGroupPort(element, (RootGroupPort) port, "outputPort");
- }
- } else {
- for (final Port port : group.getInputPorts()) {
- addPort(element, port, "inputPort");
- }
-
- for (final Port port : group.getOutputPorts()) {
- addPort(element, port, "outputPort");
- }
- }
-
- for (final Label label : group.getLabels()) {
- addLabel(element, label);
- }
-
- for (final Funnel funnel : group.getFunnels()) {
- addFunnel(element, funnel);
- }
-
- for (final ProcessGroup childGroup : group.getProcessGroups()) {
- addProcessGroup(element, childGroup, "processGroup");
- }
-
- for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
- addRemoteProcessGroup(element, remoteRef);
- }
-
- for (final Connection connection : group.getConnections()) {
- addConnection(element, connection);
- }
- }
-
- private void addStyle(final Element parentElement, final Map<String, String> style) {
- final Element element = parentElement.getOwnerDocument().createElement("styles");
-
- for (final Map.Entry<String, String> entry : style.entrySet()) {
- final Element styleElement = parentElement.getOwnerDocument().createElement("style");
- styleElement.setAttribute("name", entry.getKey());
- styleElement.setTextContent(entry.getValue());
- element.appendChild(styleElement);
- }
-
- parentElement.appendChild(element);
- }
-
- private void addLabel(final Element parentElement, final Label label) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("label");
- parentElement.appendChild(element);
- addTextElement(element, "id", label.getIdentifier());
-
- addPosition(element, label.getPosition());
- addSize(element, label.getSize());
- addStyle(element, label.getStyle());
-
- addTextElement(element, "value", label.getValue());
- parentElement.appendChild(element);
- }
-
- private void addFunnel(final Element parentElement, final Funnel funnel) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("funnel");
- parentElement.appendChild(element);
- addTextElement(element, "id", funnel.getIdentifier());
- addPosition(element, funnel.getPosition());
- }
-
- private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("remoteProcessGroup");
- parentElement.appendChild(element);
- addTextElement(element, "id", remoteRef.getIdentifier());
- addTextElement(element, "name", remoteRef.getName());
- addPosition(element, remoteRef.getPosition());
- addTextElement(element, "comment", remoteRef.getComments());
- addTextElement(element, "url", remoteRef.getTargetUri().toString());
- addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout());
- addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration());
- addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting()));
-
- for (final RemoteGroupPort port : remoteRef.getInputPorts()) {
- if (port.hasIncomingConnection()) {
- addRemoteGroupPort(element, port, "inputPort");
- }
- }
-
- for (final RemoteGroupPort port : remoteRef.getOutputPorts()) {
- if (!port.getConnections().isEmpty()) {
- addRemoteGroupPort(element, port, "outputPort");
- }
- }
-
- parentElement.appendChild(element);
- }
-
- private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement(elementName);
- parentElement.appendChild(element);
- addTextElement(element, "id", port.getIdentifier());
- addTextElement(element, "name", port.getName());
- addPosition(element, port.getPosition());
- addTextElement(element, "comments", port.getComments());
- addTextElement(element, "scheduledState", port.getScheduledState().name());
- addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
- addTextElement(element, "useCompression", String.valueOf(((RemoteGroupPort) port).isUseCompression()));
-
- parentElement.appendChild(element);
- }
-
- private void addPort(final Element parentElement, final Port port, final String elementName) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement(elementName);
- parentElement.appendChild(element);
- addTextElement(element, "id", port.getIdentifier());
- addTextElement(element, "name", port.getName());
- addPosition(element, port.getPosition());
- addTextElement(element, "comments", port.getComments());
- addTextElement(element, "scheduledState", port.getScheduledState().name());
-
- parentElement.appendChild(element);
- }
-
- private void addRootGroupPort(final Element parentElement, final RootGroupPort port, final String elementName) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement(elementName);
- parentElement.appendChild(element);
- addTextElement(element, "id", port.getIdentifier());
- addTextElement(element, "name", port.getName());
- addPosition(element, port.getPosition());
- addTextElement(element, "comments", port.getComments());
- addTextElement(element, "scheduledState", port.getScheduledState().name());
- addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks()));
- for (final String user : port.getUserAccessControl()) {
- addTextElement(element, "userAccessControl", user);
- }
- for (final String group : port.getGroupAccessControl()) {
- addTextElement(element, "groupAccessControl", group);
- }
-
- parentElement.appendChild(element);
- }
-
- private void addProcessor(final Element parentElement, final ProcessorNode processor) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("processor");
- parentElement.appendChild(element);
- addTextElement(element, "id", processor.getIdentifier());
- addTextElement(element, "name", processor.getName());
-
- addPosition(element, processor.getPosition());
- addStyle(element, processor.getStyle());
-
- addTextElement(element, "comment", processor.getComments());
- addTextElement(element, "class", processor.getProcessor().getClass().getCanonicalName());
- addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks());
- addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod());
- addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod());
- addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
- addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
- addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
- addTextElement(element, "scheduledState", processor.getScheduledState().name());
- addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
- addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
-
- // properties.
- for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) {
- final PropertyDescriptor descriptor = entry.getKey();
- String value = entry.getValue();
-
- if (value != null && descriptor.isSensitive()) {
- value = ENC_PREFIX + encryptor.encrypt(value) + ENC_SUFFIX;
- }
-
- if (value == null) {
- value = descriptor.getDefaultValue();
- }
-
- final Element propElement = doc.createElement("property");
- addTextElement(propElement, "name", descriptor.getName());
- if (value != null) {
- addTextElement(propElement, "value", value);
- }
-
- element.appendChild(propElement);
- }
-
- final String annotationData = processor.getAnnotationData();
- if (annotationData != null) {
- addTextElement(element, "annotationData", annotationData);
- }
-
- for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
- addTextElement(element, "autoTerminatedRelationship", rel.getName());
- }
- }
-
- private void addConnection(final Element parentElement, final Connection connection) {
- final Document doc = parentElement.getOwnerDocument();
- final Element element = doc.createElement("connection");
- parentElement.appendChild(element);
- addTextElement(element, "id", connection.getIdentifier());
- addTextElement(element, "name", connection.getName());
-
- final Element bendPointsElement = doc.createElement("bendPoints");
- element.appendChild(bendPointsElement);
- for (final Position bendPoint : connection.getBendPoints()) {
- addPosition(bendPointsElement, bendPoint, "bendPoint");
- }
-
- addTextElement(element, "labelIndex", connection.getLabelIndex());
- addTextElement(element, "zIndex", connection.getZIndex());
-
- final String sourceId = connection.getSource().getIdentifier();
- final ConnectableType sourceType = connection.getSource().getConnectableType();
- final String sourceGroupId;
- if (sourceType == ConnectableType.REMOTE_OUTPUT_PORT) {
- sourceGroupId = ((RemoteGroupPort) connection.getSource()).getRemoteProcessGroup().getIdentifier();
- } else {
- sourceGroupId = connection.getSource().getProcessGroup().getIdentifier();
- }
-
- final ConnectableType destinationType = connection.getDestination().getConnectableType();
- final String destinationId = connection.getDestination().getIdentifier();
- final String destinationGroupId;
- if (destinationType == ConnectableType.REMOTE_INPUT_PORT) {
- destinationGroupId = ((RemoteGroupPort) connection.getDestination()).getRemoteProcessGroup().getIdentifier();
- } else {
- destinationGroupId = connection.getDestination().getProcessGroup().getIdentifier();
- }
-
- addTextElement(element, "sourceId", sourceId);
- addTextElement(element, "sourceGroupId", sourceGroupId);
- addTextElement(element, "sourceType", sourceType.toString());
-
- addTextElement(element, "destinationId", destinationId);
- addTextElement(element, "destinationGroupId", destinationGroupId);
- addTextElement(element, "destinationType", destinationType.toString());
-
- for (final Relationship relationship : connection.getRelationships()) {
- addTextElement(element, "relationship", relationship.getName());
- }
-
- addTextElement(element, "maxWorkQueueSize", connection.getFlowFileQueue().getBackPressureObjectThreshold());
- addTextElement(element, "maxWorkQueueDataSize", connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
-
- addTextElement(element, "flowFileExpiration", connection.getFlowFileQueue().getFlowFileExpiration());
- for (final FlowFilePrioritizer comparator : connection.getFlowFileQueue().getPriorities()) {
- final String className = comparator.getClass().getCanonicalName();
- addTextElement(element, "queuePrioritizerClass", className);
- }
-
- parentElement.appendChild(element);
- }
-
- private void addTextElement(final Element element, final String name, final long value) {
- addTextElement(element, name, String.valueOf(value));
- }
-
- private void addTextElement(final Element element, final String name, final String value) {
- final Document doc = element.getOwnerDocument();
- final Element toAdd = doc.createElement(name);
- toAdd.setTextContent(value);
- element.appendChild(toAdd);
- }
-
-}