You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/01/08 18:13:53 UTC
[05/50] nifi git commit: NIFI-4436: Added additional endpoints;
bug fixes
NIFI-4436: Added additional endpoints; bug fixes
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6aa8b5c6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6aa8b5c6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6aa8b5c6
Branch: refs/heads/master
Commit: 6aa8b5c61c734ce56aad7816b40df88d8316feeb
Parents: 7a0a900
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Oct 30 16:35:59 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Mon Jan 8 12:44:52 2018 -0500
----------------------------------------------------------------------
.../nifi/web/api/entity/ProcessGroupEntity.java | 17 +-
.../org/apache/nifi/groups/ProcessGroup.java | 3 +-
.../apache/nifi/registry/flow/FlowRegistry.java | 30 ++
.../nifi/registry/flow/FlowRegistryClient.java | 6 +
.../apache/nifi/controller/FlowController.java | 103 ++++
.../controller/StandardFlowSynchronizer.java | 36 +-
.../serialization/StandardFlowSerializer.java | 29 +-
.../nifi/fingerprint/FingerprintFactory.java | 21 +
.../nifi/groups/StandardProcessGroup.java | 134 ++++--
.../registry/flow/FileBasedFlowRegistry.java | 478 +++++++++++++++++++
.../flow/FileBasedFlowRegistryClient.java | 435 -----------------
.../flow/StandardFlowRegistryClient.java | 75 +++
.../flow/mapping/NiFiRegistryFlowMapper.java | 95 +++-
.../java/org/apache/nifi/util/BundleUtils.java | 48 +-
.../src/main/resources/FlowConfiguration.xsd | 17 +
.../src/main/resources/nifi-context.xml | 4 +-
.../service/mock/MockProcessGroup.java | 2 +-
.../fingerprint/FingerprintFactoryTest.java | 8 +
.../nifi/registry/flow/FlowRegistryUtils.java | 74 +++
.../org/apache/nifi/web/NiFiServiceFacade.java | 12 +-
.../nifi/web/StandardNiFiServiceFacade.java | 266 ++++++-----
.../nifi/web/api/ProcessGroupResource.java | 173 ++++---
.../apache/nifi/web/api/VersionsResource.java | 151 +++---
.../org/apache/nifi/web/api/dto/DtoFactory.java | 10 +
.../nifi/web/controller/ControllerFacade.java | 4 +
.../apache/nifi/web/dao/ProcessGroupDAO.java | 3 +-
.../org/apache/nifi/web/dao/RegistryDAO.java | 35 ++
.../nifi/web/dao/impl/FlowRegistryDAO.java | 66 +++
.../web/dao/impl/StandardProcessGroupDAO.java | 5 +-
.../src/main/resources/nifi-web-api-context.xml | 4 +
30 files changed, 1578 insertions(+), 766 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
index 83af9d8..1e2a4b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ProcessGroupEntity.java
@@ -16,11 +16,13 @@
*/
package org.apache.nifi.web.api.entity;
-import io.swagger.annotations.ApiModelProperty;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
-import javax.xml.bind.annotation.XmlRootElement;
+import io.swagger.annotations.ApiModelProperty;
/**
* A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ProcessGroupDTO.
@@ -30,6 +32,7 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
private ProcessGroupDTO component;
private ProcessGroupStatusDTO status;
+ private VersionedFlowSnapshot versionedFlowSnapshot;
private Integer runningCount;
private Integer stoppedCount;
@@ -46,10 +49,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
*
* @return The ProcessGroupDTO object
*/
+ @Override
public ProcessGroupDTO getComponent() {
return component;
}
+ @Override
public void setComponent(ProcessGroupDTO component) {
this.component = component;
}
@@ -180,4 +185,12 @@ public class ProcessGroupEntity extends ComponentEntity implements Permissible<P
this.inactiveRemotePortCount = inactiveRemotePortCount;
}
+ @ApiModelProperty(value = "Returns the Versioned Flow that describes the contents of the Versioned Flow to be imported", readOnly = true)
+ public VersionedFlowSnapshot getVersionedFlowSnapshot() {
+ return versionedFlowSnapshot;
+ }
+
+ public void setVersionedFlowSnapshot(VersionedFlowSnapshot versionedFlowSnapshot) {
+ this.versionedFlowSnapshot = versionedFlowSnapshot;
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index d335461..16b4b5e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -783,8 +783,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param verifyNotDirty whether or not to verify that the Process Group is not 'dirty'. If this value is <code>true</code>,
* and the Process Group has been modified since it was last synchronized with the Flow Registry, then this method will
* throw an IllegalStateException
+ * @param updateSettings whether or not to update the process group's name and positions
*/
- void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty);
+ void updateFlow(VersionedFlowSnapshot proposedSnapshot, String componentIdSeed, boolean verifyNotDirty, boolean updateSettings);
/**
* Verifies a template with the specified name can be created.
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
index 962a940..4efff94 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistry.java
@@ -24,6 +24,22 @@ import java.io.IOException;
import java.util.Set;
public interface FlowRegistry {
+ /**
+ * @return the ID of the Flow Registry
+ */
+ String getIdentifier();
+
+ /**
+ * @return the description of the Flow Registry
+ */
+ String getDescription();
+
+ /**
+ * Updates the Flow Registry's description
+ *
+ * @param description the description of the Flow Registry
+ */
+ void setDescription(String description);
/**
* @return the URL of the Flow Registry
@@ -31,11 +47,25 @@ public interface FlowRegistry {
String getURL();
/**
+ * Updates the Flow Registry's URL
+ *
+ * @param url the URL of the Flow Registry
+ */
+ void setURL(String url);
+
+ /**
* @return the name of the Flow Registry
*/
String getName();
/**
+ * Updates the name of the Flow Registry
+ *
+ * @param name the name of the Flow Registry
+ */
+ void setName(String name);
+
+ /**
* Gets the buckets for the specified user.
*
* @param user current user
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
index 83f66dc..77c2761 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/registry/flow/FlowRegistryClient.java
@@ -34,4 +34,10 @@ public interface FlowRegistryClient {
}
Set<String> getRegistryIdentifiers();
+
+ void addFlowRegistry(FlowRegistry registry);
+
+ FlowRegistry addFlowRegistry(String registryId, String registryName, String registryUrl, String description);
+
+ FlowRegistry removeFlowRegistry(String registryId);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 242ef6a..5ed5b6e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -165,6 +165,8 @@ import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.registry.ComponentVariableRegistry;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
+import org.apache.nifi.registry.flow.VersionedConnection;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.registry.variable.StandardComponentVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
@@ -2128,6 +2130,13 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) {
+ final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
+ if (!supportedBundles.contains(requiredCoordinate)) {
+ throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
+ }
+ }
+
private void verifyProcessorsInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
if (templateContents.getProcessors() != null) {
templateContents.getProcessors().forEach(processor -> {
@@ -2150,6 +2159,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+ if (versionedFlow.getProcessors() != null) {
+ versionedFlow.getProcessors().forEach(processor -> {
+ if (processor.getBundle() == null) {
+ throw new IllegalArgumentException("Processor bundle must be specified.");
+ }
+
+ if (supportedTypes.containsKey(processor.getType())) {
+ verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
+ } else {
+ throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
+ }
+ });
+ }
+
+ if (versionedFlow.getProcessGroups() != null) {
+ versionedFlow.getProcessGroups().forEach(processGroup -> {
+ verifyProcessorsInVersionedFlow(processGroup, supportedTypes);
+ });
+ }
+ }
+
private void verifyControllerServicesInSnippet(final FlowSnippetDTO templateContents, final Map<String, Set<BundleCoordinate>> supportedTypes) {
if (templateContents.getControllerServices() != null) {
templateContents.getControllerServices().forEach(controllerService -> {
@@ -2172,6 +2203,28 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ private void verifyControllerServicesInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
+ if (versionedFlow.getControllerServices() != null) {
+ versionedFlow.getControllerServices().forEach(controllerService -> {
+ if (supportedTypes.containsKey(controllerService.getType())) {
+ if (controllerService.getBundle() == null) {
+ throw new IllegalArgumentException("Controller Service bundle must be specified.");
+ }
+
+ verifyBundleInVersionedFlow(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
+ } else {
+ throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
+ }
+ });
+ }
+
+ if (versionedFlow.getProcessGroups() != null) {
+ versionedFlow.getProcessGroups().forEach(processGroup -> {
+ verifyControllerServicesInVersionedFlow(processGroup, supportedTypes);
+ });
+ }
+ }
+
public void verifyComponentTypesInSnippet(final FlowSnippetDTO templateContents) {
final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
@@ -2210,6 +2263,44 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
+ public void verifyComponentTypesInSnippet(final VersionedProcessGroup versionedFlow) {
+ final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
+ for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
+ final String name = c.getName();
+ processorClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ }
+ verifyProcessorsInVersionedFlow(versionedFlow, processorClasses);
+
+ final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
+ for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
+ final String name = c.getName();
+ controllerServiceClasses.put(name, ExtensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
+ }
+ verifyControllerServicesInVersionedFlow(versionedFlow, controllerServiceClasses);
+
+ final Set<String> prioritizerClasses = new HashSet<>();
+ for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
+ prioritizerClasses.add(c.getName());
+ }
+
+ final Set<VersionedConnection> allConns = new HashSet<>();
+ allConns.addAll(versionedFlow.getConnections());
+ for (final VersionedProcessGroup childGroup : versionedFlow.getProcessGroups()) {
+ allConns.addAll(findAllConnections(childGroup));
+ }
+
+ for (final VersionedConnection conn : allConns) {
+ final List<String> prioritizers = conn.getPrioritizers();
+ if (prioritizers != null) {
+ for (final String prioritizer : prioritizers) {
+ if (!prioritizerClasses.contains(prioritizer)) {
+ throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer);
+ }
+ }
+ }
+ }
+ }
+
/**
* <p>
* Verifies that the given DTO is valid, according to the following:
@@ -2270,6 +2361,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return conns;
}
+ private Set<VersionedConnection> findAllConnections(final VersionedProcessGroup group) {
+ final Set<VersionedConnection> conns = new HashSet<>();
+ for (final VersionedConnection connection : group.getConnections()) {
+ conns.add(connection);
+ }
+
+ for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
+ conns.addAll(findAllConnections(childGroup));
+ }
+ return conns;
+ }
+
//
// Processor access
//
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index e879e38..5a7aeec 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -85,6 +85,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.SimpleProcessLogger;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RemoteGroupPort;
@@ -184,7 +185,10 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
try {
if (flowAlreadySynchronized) {
existingFlow = toBytes(controller);
- existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty() && controller.getAllReportingTasks().isEmpty() && controller.getAllControllerServices().isEmpty();
+ existingFlowEmpty = controller.getGroup(controller.getRootGroupId()).isEmpty()
+ && controller.getAllReportingTasks().isEmpty()
+ && controller.getAllControllerServices().isEmpty()
+ && controller.getFlowRegistryClient().getRegistryIdentifiers().isEmpty();
} else {
existingFlow = readFlowFromDisk();
if (existingFlow == null || existingFlow.length == 0) {
@@ -220,10 +224,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
unrootedControllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
}
+ final boolean registriesPresent;
+ final Element registriesElement = DomUtils.getChild(rootElement, "registries");
+ if (registriesElement == null) {
+ registriesPresent = false;
+ } else {
+ final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+ registriesPresent = !flowRegistryElems.isEmpty();
+ }
+
logger.trace("Parsing process group from DOM");
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor, encodingVersion);
- existingFlowEmpty = taskElements.isEmpty() && unrootedControllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
+ existingFlowEmpty = taskElements.isEmpty()
+ && unrootedControllerServiceElements.isEmpty()
+ && isEmpty(rootGroupDto)
+ && registriesPresent;
logger.debug("Existing Flow Empty = {}", existingFlowEmpty);
}
}
@@ -318,6 +334,22 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// get the root group XML element
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
+ if (!flowAlreadySynchronized || existingFlowEmpty) {
+ final Element registriesElement = DomUtils.getChild(rootElement, "registries");
+ if (registriesElement != null) {
+ final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+ for (final Element flowRegistryElement : flowRegistryElems) {
+ final String registryId = getString(flowRegistryElement, "id");
+ final String registryName = getString(flowRegistryElement, "name");
+ final String registryUrl = getString(flowRegistryElement, "url");
+ final String description = getString(flowRegistryElement, "description");
+
+ final FlowRegistryClient client = controller.getFlowRegistryClient();
+ client.addFlowRegistry(registryId, registryName, registryUrl, description);
+ }
+ }
+ }
+
// if this controller isn't initialized or its empty, add the root group, otherwise update
final ProcessGroup rootGroup;
if (!flowAlreadySynchronized || existingFlowEmpty) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
index ecf2438..f921bc6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/serialization/StandardFlowSerializer.java
@@ -39,6 +39,8 @@ import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
+import org.apache.nifi.registry.flow.FlowRegistry;
+import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
@@ -74,7 +76,7 @@ import java.util.concurrent.TimeUnit;
*/
public class StandardFlowSerializer implements FlowSerializer {
- private static final String MAX_ENCODING_VERSION = "1.2";
+ private static final String MAX_ENCODING_VERSION = "1.3";
private final StringEncryptor encryptor;
@@ -98,6 +100,11 @@ public class StandardFlowSerializer implements FlowSerializer {
doc.appendChild(rootNode);
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
+
+ final Element registriesElement = doc.createElement("registries");
+ rootNode.appendChild(registriesElement);
+
+ addFlowRegistries(registriesElement, controller.getFlowRegistryClient());
addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup", scheduledStateLookup);
// Add root-level controller services
@@ -130,6 +137,26 @@ public class StandardFlowSerializer implements FlowSerializer {
}
}
+ private void addFlowRegistries(final Element parentElement, final FlowRegistryClient registryClient) {
+ for (final String registryId : registryClient.getRegistryIdentifiers()) {
+ final FlowRegistry flowRegistry = registryClient.getFlowRegistry(registryId);
+
+ final Element registryElement = parentElement.getOwnerDocument().createElement("flowRegistry");
+ parentElement.appendChild(registryElement);
+
+ addStringElement(registryElement, "id", flowRegistry.getIdentifier());
+ addStringElement(registryElement, "name", flowRegistry.getName());
+ addStringElement(registryElement, "url", flowRegistry.getURL());
+ addStringElement(registryElement, "description", flowRegistry.getDescription());
+ }
+ }
+
+ private void addStringElement(final Element parentElement, final String elementName, final String value) {
+ final Element childElement = parentElement.getOwnerDocument().createElement(elementName);
+ childElement.setTextContent(value);
+ parentElement.appendChild(childElement);
+ }
+
private void addSize(final Element parentElement, final Size size) {
final Element element = parentElement.getOwnerDocument().createElement("size");
element.setAttribute("width", String.valueOf(size.getWidth()));
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
index 3aa5084..e1846a0 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/fingerprint/FingerprintFactory.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.stream.Stream;
import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
@@ -198,6 +199,21 @@ public class FingerprintFactory {
}
private StringBuilder addFlowControllerFingerprint(final StringBuilder builder, final Element flowControllerElem, final FlowController controller) {
+ // registries
+ final Element registriesElement = DomUtils.getChild(flowControllerElem, "registries");
+ if (registriesElement == null) {
+ builder.append("NO_VALUE");
+ } else {
+ final List<Element> flowRegistryElems = DomUtils.getChildElementsByTagName(registriesElement, "flowRegistry");
+ if (flowRegistryElems.isEmpty()) {
+ builder.append("NO_VALUE");
+ } else {
+ for (final Element flowRegistryElement : flowRegistryElems) {
+ addFlowRegistryFingerprint(builder, flowRegistryElement);
+ }
+ }
+ }
+
// root group
final Element rootGroupElem = (Element) DomUtils.getChildNodesByTagName(flowControllerElem, "rootGroup").item(0);
addProcessGroupFingerprint(builder, rootGroupElem, controller);
@@ -265,6 +281,11 @@ public class FingerprintFactory {
return builder;
}
+ private StringBuilder addFlowRegistryFingerprint(final StringBuilder builder, final Element flowRegistryElement) {
+ Stream.of("id", "name", "url", "description").forEach(elementName -> appendFirstValue(builder, DomUtils.getChildNodesByTagName(flowRegistryElement, elementName)));
+ return builder;
+ }
+
private StringBuilder addProcessGroupFingerprint(final StringBuilder builder, final Element processGroupElem, final FlowController controller) throws FingerprintException {
// id
appendFirstValue(builder, DomUtils.getChildNodesByTagName(processGroupElem, "id"));
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3b8117b..1d8652e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -72,7 +72,7 @@ import org.apache.nifi.registry.flow.Bundle;
import org.apache.nifi.registry.flow.ConnectableComponent;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
-import org.apache.nifi.registry.flow.RemoteFlowCoordinates;
+import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
import org.apache.nifi.registry.flow.StandardVersionControlInformation;
import org.apache.nifi.registry.flow.UnknownResourceException;
import org.apache.nifi.registry.flow.VersionControlInformation;
@@ -2835,11 +2835,14 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
+ @Override
public void disconnectVersionControl() {
writeLock.lock();
try {
- // TODO remove version component ids from each component (until another versioned PG is encountered)
this.versionControlInfo.set(null);
+
+ // remove version component ids from each component (until another versioned PG is encountered)
+ applyVersionedComponentIds(this, id -> null);
} finally {
writeLock.unlock();
}
@@ -2850,36 +2853,41 @@ public final class StandardProcessGroup implements ProcessGroup {
return;
}
- processGroup.setVersionedComponentId(versionedComponentIds.get(processGroup.getIdentifier()));
+ applyVersionedComponentIds(processGroup, versionedComponentIds::get);
+ }
+
+ private void applyVersionedComponentIds(final ProcessGroup processGroup, final Function<String, String> lookup) {
+ processGroup.setVersionedComponentId(lookup.apply(processGroup.getIdentifier()));
processGroup.getConnections().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getProcessors().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getInputPorts().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getOutputPorts().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getLabels().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getFunnels().stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getControllerServices(false).stream()
- .forEach(component -> component.setVersionedComponentId(versionedComponentIds.get(component.getIdentifier())));
+ .forEach(component -> component.setVersionedComponentId(lookup.apply(component.getIdentifier())));
processGroup.getRemoteProcessGroups().stream()
.forEach(rpg -> {
- rpg.setVersionedComponentId(versionedComponentIds.get(rpg.getIdentifier()));
+ rpg.setVersionedComponentId(lookup.apply(rpg.getIdentifier()));
rpg.getInputPorts().stream()
- .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+ .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
rpg.getOutputPorts().stream()
- .forEach(port -> port.setVersionedComponentId(versionedComponentIds.get(port.getIdentifier())));
+ .forEach(port -> port.setVersionedComponentId(lookup.apply(port.getIdentifier())));
});
processGroup.getProcessGroups().stream()
- .forEach(childGroup -> updateVersionedComponentIds(childGroup, versionedComponentIds));
+ .filter(childGroup -> childGroup.getVersionControlInformation() != null)
+ .forEach(childGroup -> applyVersionedComponentIds(childGroup, lookup));
}
@@ -2931,10 +2939,10 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
- public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty) {
+ public void updateFlow(final VersionedFlowSnapshot proposedSnapshot, final String componentIdSeed, final boolean verifyNotDirty, final boolean updateSettings) {
writeLock.lock();
try {
- verifyCanUpdate(proposedSnapshot, true, verifyNotDirty); // TODO: Should perform more verification... verifyCanDelete, verifyCanUpdate, etc. Recursively if child is under VC also
+ verifyCanUpdate(proposedSnapshot, true, verifyNotDirty);
final NiFiRegistryFlowMapper mapper = new NiFiRegistryFlowMapper();
final VersionedProcessGroup versionedGroup = mapper.mapProcessGroup(this, flowController.getFlowRegistryClient());
@@ -2950,15 +2958,15 @@ public final class StandardProcessGroup implements ProcessGroup {
.map(diff -> diff.getComponentA() == null ? diff.getComponentB().getIdentifier() : diff.getComponentA().getIdentifier())
.collect(Collectors.toSet());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
- } else {
- // TODO: Remove the actual differences from the info level log. It can be extremely verbose. Is here only for testing purposes becuase it's much more convenient
- // than having to remember to enable DEBUG level logging every time a full build is done.
- LOG.info("Updating {} to {}; there are {} differences to take into account: {}", this, proposedSnapshot, flowComparison.getDifferences().size(), flowComparison.getDifferences());
+ if (LOG.isInfoEnabled()) {
+ final String differencesByLine = flowComparison.getDifferences().stream()
+ .map(FlowDifference::toString)
+ .collect(Collectors.joining("\n"));
+
+ LOG.info("Updating {} to {}; there are {} differences to take into account:\n{}", this, proposedSnapshot, flowComparison.getDifferences().size(), differencesByLine);
}
- updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false);
+ updateProcessGroup(this, proposedSnapshot.getFlowContents(), componentIdSeed, updatedVersionedComponentIds, false, updateSettings);
} catch (final ProcessorInstantiationException pie) {
throw new RuntimeException(pie);
} finally {
@@ -2968,10 +2976,14 @@ public final class StandardProcessGroup implements ProcessGroup {
private void updateProcessGroup(final ProcessGroup group, final VersionedProcessGroup proposed, final String componentIdSeed,
- final Set<String> updatedVersionedComponentIds, final boolean updatePosition) throws ProcessorInstantiationException {
+ final Set<String> updatedVersionedComponentIds, final boolean updatePosition, final boolean updateName) throws ProcessorInstantiationException {
group.setComments(proposed.getComments());
- group.setName(proposed.getName());
+
+ if (updateName) {
+ group.setName(proposed.getName());
+ }
+
if (updatePosition && proposed.getPosition() != null) {
group.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
}
@@ -2998,7 +3010,7 @@ public final class StandardProcessGroup implements ProcessGroup {
group.setVariables(updatedVariableMap);
- final RemoteFlowCoordinates remoteCoordinates = proposed.getRemoteFlowCoordinates();
+ final VersionedFlowCoordinates remoteCoordinates = proposed.getVersionedFlowCoordinates();
if (remoteCoordinates != null) {
final String registryId = flowController.getFlowRegistryClient().getFlowRegistryId(remoteCoordinates.getRegistryUrl());
final String bucketId = remoteCoordinates.getBucketId();
@@ -3022,7 +3034,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ProcessGroup added = addProcessGroup(proposedChildGroup, componentIdSeed);
LOG.info("Added {} to {}", added, this);
} else {
- updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true);
+ updateProcessGroup(childGroup, proposedChildGroup, componentIdSeed, updatedVersionedComponentIds, true, updateName);
LOG.info("Updated {}", childGroup);
}
@@ -3136,14 +3148,29 @@ public final class StandardProcessGroup implements ProcessGroup {
final Map<String, ProcessorNode> processorsByVersionedId = group.getProcessors().stream()
.collect(Collectors.toMap(component -> component.getVersionedComponentId().orElse(component.getIdentifier()), Function.identity()));
final Set<String> processorsRemoved = new HashSet<>(processorsByVersionedId.keySet());
+ final Map<ProcessorNode, Set<Relationship>> autoTerminatedRelationships = new HashMap<>();
for (final VersionedProcessor proposedProcessor : proposed.getProcessors()) {
final ProcessorNode processor = processorsByVersionedId.get(proposedProcessor.getIdentifier());
if (processor == null) {
final ProcessorNode added = addProcessor(proposedProcessor, componentIdSeed);
+
+ final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream()
+ .map(relName -> added.getRelationship(relName))
+ .collect(Collectors.toSet());
+ autoTerminatedRelationships.put(added, proposedAutoTerminated);
LOG.info("Added {} to {}", added, this);
} else if (updatedVersionedComponentIds.contains(proposedProcessor.getIdentifier())) {
updateProcessor(processor, proposedProcessor);
+
+ final Set<Relationship> proposedAutoTerminated = proposedProcessor.getAutoTerminatedRelationships().stream()
+ .map(relName -> processor.getRelationship(relName))
+ .collect(Collectors.toSet());
+
+ if (!processor.getAutoTerminatedRelationships().equals(proposedAutoTerminated)) {
+ autoTerminatedRelationships.put(processor, proposedAutoTerminated);
+ }
+
LOG.info("Updated {}", processor);
} else {
processor.setPosition(new Position(proposedProcessor.getPosition().getX(), proposedProcessor.getPosition().getY()));
@@ -3205,6 +3232,13 @@ public final class StandardProcessGroup implements ProcessGroup {
group.removeConnection(connection);
}
+ // Once the appropriate connections have been removed, we may now update Processors' auto-terminated relationships.
+ // We cannot do this above, in the 'updateProcessor' call because if a connection is removed and changed to auto-terminated,
+ // then updating this in the updateProcessor call above would attempt to set the Relationship to being auto-terminated while a
+ // Connection for that relationship exists. This will throw an Exception.
+ autoTerminatedRelationships.forEach((proc, rels) -> proc.setAutoTerminatedRelationships(rels));
+
+ // Remove all controller services no longer in use
for (final String removedVersionedId : controllerServicesRemoved) {
final ControllerServiceNode service = servicesByVersionedId.get(removedVersionedId);
LOG.info("Removing {} from {}", service, group);
@@ -3276,7 +3310,7 @@ public final class StandardProcessGroup implements ProcessGroup {
final ProcessGroup group = flowController.createProcessGroup(generateUuid(componentIdSeed));
group.setVersionedComponentId(proposed.getIdentifier());
addProcessGroup(group);
- updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true);
+ updateProcessGroup(group, proposed, componentIdSeed, Collections.emptySet(), true, true);
return group;
}
@@ -3535,10 +3569,6 @@ public final class StandardProcessGroup implements ProcessGroup {
processor.setYieldPeriod(proposed.getYieldDuration());
processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
- processor.setAutoTerminatedRelationships(proposed.getAutoTerminatedRelationships().stream()
- .map(relName -> processor.getRelationship(relName))
- .collect(Collectors.toSet()));
-
if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
@@ -3547,6 +3577,7 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
+
private Map<String, String> populatePropertiesMap(final Map<PropertyDescriptor, String> currentProperties, final Map<String, String> proposedProperties) {
final Map<String, String> fullPropertyMap = new HashMap<>();
for (final PropertyDescriptor property : currentProperties.keySet()) {
@@ -3646,7 +3677,7 @@ public final class StandardProcessGroup implements ProcessGroup {
@Override
public String getName() {
- return "Flow Under Version Control";
+ return "Versioned Flow";
}
};
@@ -3659,7 +3690,7 @@ public final class StandardProcessGroup implements ProcessGroup {
.findAny()
.isPresent();
- LOG.debug("There are {} differences between this flow and the versioned snapshot of this flow: {}", differences.size(), differences);
+ LOG.debug("There are {} differences between this Local FLow and the Versioned Flow: {}", differences.size(), differences);
return Optional.of(modified);
}
@@ -3669,27 +3700,24 @@ public final class StandardProcessGroup implements ProcessGroup {
readLock.lock();
try {
final VersionControlInformation versionControlInfo = getVersionControlInformation();
- if (versionControlInfo == null) {
- throw new IllegalStateException("Cannot update the Version of the flow for " + this
- + " because the Process Group is not currently under Version Control");
- }
-
- if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
- throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
- }
-
- if (verifyNotDirty) {
- final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
- if (!modifiedOption.isPresent()) {
- throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
- + "has not yet been synchronized with the Flow Registry. The Process Group must be"
- + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
+ if (versionControlInfo != null) {
+ if (!versionControlInfo.getFlowIdentifier().equals(updatedFlow.getSnapshotMetadata().getFlowIdentifier())) {
+ throw new IllegalStateException(this + " is under version control but the given flow does not match the flow that this Process Group is synchronized with");
}
- if (Boolean.TRUE.equals(modifiedOption.get())) {
- throw new IllegalStateException("Cannot change the Version of the flow for " + this
- + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
- + " restored to its original form before changing the version");
+ if (verifyNotDirty) {
+ final Optional<Boolean> modifiedOption = versionControlInfo.getModified();
+ if (!modifiedOption.isPresent()) {
+ throw new IllegalStateException(this + " cannot be updated to a different version of the flow because the local flow "
+ + "has not yet been synchronized with the Flow Registry. The Process Group must be"
+ + " synched with the Flow Registry before continuing. This will happen periodically in the background, so please try the request again later");
+ }
+
+ if (Boolean.TRUE.equals(modifiedOption.get())) {
+ throw new IllegalStateException("Cannot change the Version of the flow for " + this
+ + " because the Process Group has been modified since it was last synchronized with the Flow Registry. The Process Group must be"
+ + " restored to its original form before changing the version");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
new file mode 100644
index 0000000..da5880c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistry.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.registry.flow;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.registry.bucket.Bucket;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * A simple file-based implementation of a Flow Registry Client. Rather than interacting
+ * with an actual Flow Registry, this implementation simply reads flows from disk and writes
+ * them to disk. It is not meant for any production use but is available for testing purposes.
+ */
+public class FileBasedFlowRegistry implements FlowRegistry {
+ private final File directory;
+ private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
+ private final JsonFactory jsonFactory = new JsonFactory();
+ private final String id;
+ private volatile String name = "Local Registry";
+ private volatile String url = "file:" + (new File("..").getAbsolutePath());
+ private volatile String description = "Default file-based Flow Registry";
+
+ public FileBasedFlowRegistry(final String id, final String url) throws IOException {
+ final URI uri = URI.create(url);
+ if (!uri.getScheme().equalsIgnoreCase("file")) {
+ throw new IllegalArgumentException("Cannot create a File Based Flow Registry with a URL of " + url + "; URL scheme must be 'file'");
+ }
+
+ this.directory = new File(URI.create(url).getPath());
+
+ if (!directory.exists() && !directory.mkdirs()) {
+ throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
+ }
+
+ this.id = id;
+ this.url = url;
+ recoverBuckets();
+ }
+
+ private void recoverBuckets() throws IOException {
+ final File[] bucketDirs = directory.listFiles();
+ if (bucketDirs == null) {
+ throw new IOException("Could not get listing of directory " + directory);
+ }
+
+ for (final File bucketDir : bucketDirs) {
+ final File[] flowDirs = bucketDir.listFiles();
+ if (flowDirs == null) {
+ throw new IOException("Could not get listing of directory " + bucketDir);
+ }
+
+ final Set<String> flowNames = new HashSet<>();
+ for (final File flowDir : flowDirs) {
+ final File propsFile = new File(flowDir, "flow.properties");
+ if (!propsFile.exists()) {
+ continue;
+ }
+
+ final Properties properties = new Properties();
+ try (final InputStream in = new FileInputStream(propsFile)) {
+ properties.load(in);
+ }
+
+ final String flowName = properties.getProperty("name");
+ if (flowName == null) {
+ continue;
+ }
+
+ flowNames.add(flowName);
+ }
+
+ if (!flowNames.isEmpty()) {
+ flowNamesByBucket.put(bucketDir.getName(), flowNames);
+ }
+ }
+ }
+
+ @Override
+ public String getURL() {
+ return url;
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public Set<Bucket> getBuckets(NiFiUser user) throws IOException {
+ final Set<Bucket> buckets = new HashSet<>();
+
+ final File[] bucketDirs = directory.listFiles();
+ if (bucketDirs == null) {
+ throw new IOException("Could not get listing of directory " + directory);
+ }
+
+ for (final File bucketDirectory : bucketDirs) {
+ final String bucketIdentifier = bucketDirectory.getName();
+ final long creation = bucketDirectory.lastModified();
+
+ final Bucket bucket = new Bucket();
+ bucket.setIdentifier(bucketIdentifier);
+ bucket.setName("Bucket '" + bucketIdentifier + "'");
+ bucket.setCreatedTimestamp(creation);
+
+ final Set<VersionedFlow> versionedFlows = new HashSet<>();
+ final File[] flowDirs = bucketDirectory.listFiles();
+ if (flowDirs != null) {
+ for (final File flowDir : flowDirs) {
+ final String flowIdentifier = flowDir.getName();
+ try {
+ final VersionedFlow versionedFlow = getVersionedFlow(bucketIdentifier, flowIdentifier);
+ versionedFlows.add(versionedFlow);
+ } catch (UnknownResourceException e) {
+ continue;
+ }
+ }
+ }
+
+ bucket.setVersionedFlows(versionedFlows);
+
+ buckets.add(bucket);
+ }
+
+ return buckets;
+ }
+
+
+ @Override
+ public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
+ Objects.requireNonNull(flow);
+ Objects.requireNonNull(flow.getBucketIdentifier());
+ Objects.requireNonNull(flow.getName());
+
+ // Verify that bucket exists
+ final File bucketDir = new File(directory, flow.getBucketIdentifier());
+ if (!bucketDir.exists()) {
+ throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+ }
+
+ // Verify that there is no flow with the same name in that bucket
+ final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier());
+ if (flowNames != null && flowNames.contains(flow.getName())) {
+ throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier());
+ }
+
+ final String flowIdentifier = UUID.randomUUID().toString();
+ final File flowDir = new File(bucketDir, flowIdentifier);
+ if (!flowDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + flowDir + " for new Flow");
+ }
+
+ final File propertiesFile = new File(flowDir, "flow.properties");
+
+ final Properties flowProperties = new Properties();
+ flowProperties.setProperty("name", flow.getName());
+ flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp()));
+ flowProperties.setProperty("description", flow.getDescription());
+ flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp()));
+
+ try (final OutputStream out = new FileOutputStream(propertiesFile)) {
+ flowProperties.store(out, null);
+ }
+
+ final VersionedFlow response = new VersionedFlow();
+ response.setBucketIdentifier(flow.getBucketIdentifier());
+ response.setCreatedTimestamp(flow.getCreatedTimestamp());
+ response.setDescription(flow.getDescription());
+ response.setIdentifier(flowIdentifier);
+ response.setModifiedTimestamp(flow.getModifiedTimestamp());
+ response.setName(flow.getName());
+
+ return response;
+ }
+
+ @Override
+ public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments)
+ throws IOException, UnknownResourceException {
+ Objects.requireNonNull(flow);
+ Objects.requireNonNull(flow.getBucketIdentifier());
+ Objects.requireNonNull(flow.getName());
+ Objects.requireNonNull(snapshot);
+
+ // Verify that the bucket exists
+ final File bucketDir = new File(directory, flow.getBucketIdentifier());
+ if (!bucketDir.exists()) {
+ throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
+ }
+
+ // Verify that the flow exists
+ final File flowDir = new File(bucketDir, flow.getIdentifier());
+ if (!flowDir.exists()) {
+ throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
+ }
+
+ final File[] versionDirs = flowDir.listFiles();
+ if (versionDirs == null) {
+ throw new IOException("Unable to perform listing of directory " + flowDir);
+ }
+
+ int maxVersion = 0;
+ for (final File versionDir : versionDirs) {
+ final String versionName = versionDir.getName();
+
+ final int version;
+ try {
+ version = Integer.parseInt(versionName);
+ } catch (final NumberFormatException nfe) {
+ continue;
+ }
+
+ if (version > maxVersion) {
+ maxVersion = version;
+ }
+ }
+
+ final int snapshotVersion = maxVersion + 1;
+ final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion));
+ if (!snapshotDir.mkdir()) {
+ throw new IOException("Could not create directory " + snapshotDir);
+ }
+
+ final File contentsFile = new File(snapshotDir, "flow.xml");
+
+ try (final OutputStream out = new FileOutputStream(contentsFile);
+ final JsonGenerator generator = jsonFactory.createGenerator(out)) {
+ generator.setCodec(new ObjectMapper());
+ generator.setPrettyPrinter(new DefaultPrettyPrinter());
+ generator.writeObject(snapshot);
+ }
+
+ final Properties snapshotProperties = new Properties();
+ snapshotProperties.setProperty("comments", comments);
+ snapshotProperties.setProperty("name", flow.getName());
+ final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties");
+ try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) {
+ snapshotProperties.store(out, null);
+ }
+
+ final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+ snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier());
+ snapshotMetadata.setComments(comments);
+ snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
+ snapshotMetadata.setFlowName(flow.getName());
+ snapshotMetadata.setTimestamp(System.currentTimeMillis());
+ snapshotMetadata.setVersion(snapshotVersion);
+
+ final VersionedFlowSnapshot response = new VersionedFlowSnapshot();
+ response.setSnapshotMetadata(snapshotMetadata);
+ response.setFlowContents(snapshot);
+ return response;
+ }
+
+ @Override
+ public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+ // Verify that the bucket exists
+ final File bucketDir = new File(directory, bucketId);
+ if (!bucketDir.exists()) {
+ throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+ }
+
+ // Verify that the flow exists
+ final File flowDir = new File(bucketDir, flowId);
+ if (!flowDir.exists()) {
+ throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
+ }
+
+ final File[] versionDirs = flowDir.listFiles();
+ if (versionDirs == null) {
+ throw new IOException("Unable to perform listing of directory " + flowDir);
+ }
+
+ int maxVersion = 0;
+ for (final File versionDir : versionDirs) {
+ final String versionName = versionDir.getName();
+
+ final int version;
+ try {
+ version = Integer.parseInt(versionName);
+ } catch (final NumberFormatException nfe) {
+ continue;
+ }
+
+ if (version > maxVersion) {
+ maxVersion = version;
+ }
+ }
+
+ return maxVersion;
+ }
+
+ @Override
+ public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException {
+ // Verify that the bucket exists
+ final File bucketDir = new File(directory, bucketId);
+ if (!bucketDir.exists()) {
+ throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+ }
+
+ // Verify that the flow exists
+ final File flowDir = new File(bucketDir, flowId);
+ if (!flowDir.exists()) {
+ throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+ }
+
+ final File versionDir = new File(flowDir, String.valueOf(version));
+ if (!versionDir.exists()) {
+ throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
+ }
+
+ final File contentsFile = new File(versionDir, "flow.xml");
+
+ final VersionedProcessGroup processGroup;
+ try (final JsonParser parser = jsonFactory.createParser(contentsFile)) {
+ final ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ parser.setCodec(mapper);
+ processGroup = parser.readValueAs(VersionedProcessGroup.class);
+ }
+
+ final Properties properties = new Properties();
+ final File snapshotPropsFile = new File(versionDir, "snapshot.properties");
+ try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
+ properties.load(in);
+ }
+
+ final String comments = properties.getProperty("comments");
+ final String flowName = properties.getProperty("name");
+
+ final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+ snapshotMetadata.setBucketIdentifier(bucketId);
+ snapshotMetadata.setComments(comments);
+ snapshotMetadata.setFlowIdentifier(flowId);
+ snapshotMetadata.setFlowName(flowName);
+ snapshotMetadata.setTimestamp(System.currentTimeMillis());
+ snapshotMetadata.setVersion(version);
+
+ final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+ snapshot.setFlowContents(processGroup);
+ snapshot.setSnapshotMetadata(snapshotMetadata);
+
+ return snapshot;
+ }
+
+ @Override
+ public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
+ // Verify that the bucket exists
+ final File bucketDir = new File(directory, bucketId);
+ if (!bucketDir.exists()) {
+ throw new UnknownResourceException("No bucket exists with ID " + bucketId);
+ }
+
+ // Verify that the flow exists
+ final File flowDir = new File(bucketDir, flowId);
+ if (!flowDir.exists()) {
+ throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
+ }
+
+ final File flowPropsFile = new File(flowDir, "flow.properties");
+ final Properties flowProperties = new Properties();
+ try (final InputStream in = new FileInputStream(flowPropsFile)) {
+ flowProperties.load(in);
+ }
+
+ final VersionedFlow flow = new VersionedFlow();
+ flow.setBucketIdentifier(bucketId);
+ flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created")));
+ flow.setDescription(flowProperties.getProperty("description"));
+ flow.setIdentifier(flowId);
+ flow.setModifiedTimestamp(flowDir.lastModified());
+ flow.setName(flowProperties.getProperty("name"));
+
+ final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion());
+
+ final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator);
+ flow.setSnapshotMetadata(snapshotMetadataSet);
+
+ final File[] versionDirs = flowDir.listFiles();
+ flow.setVersionCount(versionDirs.length);
+
+ for (final File file : versionDirs) {
+ if (!file.isDirectory()) {
+ continue;
+ }
+
+ int version;
+ try {
+ version = Integer.parseInt(file.getName());
+ } catch (final NumberFormatException nfe) {
+ // not a version. skip.
+ continue;
+ }
+
+ final File snapshotPropsFile = new File(file, "snapshot.properties");
+ final Properties snapshotProperties = new Properties();
+ try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
+ snapshotProperties.load(in);
+ }
+
+ final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
+ metadata.setBucketIdentifier(bucketId);
+ metadata.setComments(snapshotProperties.getProperty("comments"));
+ metadata.setFlowIdentifier(flowId);
+ metadata.setFlowName(snapshotProperties.getProperty("name"));
+ metadata.setTimestamp(file.lastModified());
+ metadata.setVersion(version);
+
+ snapshotMetadataSet.add(metadata);
+ }
+
+ return flow;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return id;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+
+ @Override
+ public void setDescription(String description) {
+ this.description = description;
+ }
+
+ @Override
+ public void setURL(String url) {
+ this.url = url;
+ }
+
+ @Override
+ public void setName(String name) {
+ this.name = name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6aa8b5c6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
deleted file mode 100644
index 2cc39c6..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/FileBasedFlowRegistryClient.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.registry.flow;
-
-import com.fasterxml.jackson.core.JsonFactory;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.nifi.authorization.user.NiFiUser;
-import org.apache.nifi.registry.bucket.Bucket;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
-
-/**
- * A simple file-based implementation of a Flow Registry Client. Rather than interacting
- * with an actual Flow Registry, this implementation simply reads flows from disk and writes
- * them to disk. It is not meant for any production use but is available for testing purposes.
- */
-public class FileBasedFlowRegistryClient implements FlowRegistryClient, FlowRegistry {
- private final File directory;
- private final Map<String, Set<String>> flowNamesByBucket = new HashMap<>();
- private final JsonFactory jsonFactory = new JsonFactory();
-
- public FileBasedFlowRegistryClient(final File directory) throws IOException {
- if (!directory.exists() && !directory.mkdirs()) {
- throw new IOException("Could not access or create directory " + directory.getAbsolutePath() + " for Flow Registry");
- }
-
- this.directory = directory;
- recoverBuckets();
- }
-
- private void recoverBuckets() throws IOException {
- final File[] bucketDirs = directory.listFiles();
- if (bucketDirs == null) {
- throw new IOException("Could not get listing of directory " + directory);
- }
-
- for (final File bucketDir : bucketDirs) {
- final File[] flowDirs = bucketDir.listFiles();
- if (flowDirs == null) {
- throw new IOException("Could not get listing of directory " + bucketDir);
- }
-
- final Set<String> flowNames = new HashSet<>();
- for (final File flowDir : flowDirs) {
- final File propsFile = new File(flowDir, "flow.properties");
- if (!propsFile.exists()) {
- continue;
- }
-
- final Properties properties = new Properties();
- try (final InputStream in = new FileInputStream(propsFile)) {
- properties.load(in);
- }
-
- final String flowName = properties.getProperty("name");
- if (flowName == null) {
- continue;
- }
-
- flowNames.add(flowName);
- }
-
- if (!flowNames.isEmpty()) {
- flowNamesByBucket.put(bucketDir.getName(), flowNames);
- }
- }
- }
-
- @Override
- public FlowRegistry getFlowRegistry(final String registryId) {
- if (!"default".equals(registryId)) {
- return null;
- }
-
- return this;
- }
-
- @Override
- public String getURL() {
- return directory.toURI().toString();
- }
-
- @Override
- public String getName() {
- return "Local Registry";
- }
-
- @Override
- public Set<Bucket> getBuckets(NiFiUser user) throws IOException {
- final Set<Bucket> buckets = new HashSet<>();
-
- final File[] bucketDirs = directory.listFiles();
- if (bucketDirs == null) {
- throw new IOException("Could not get listing of directory " + directory);
- }
-
- for (final File bucketDirectory : bucketDirs) {
- final String bucketIdentifier = bucketDirectory.getName();
- final long creation = bucketDirectory.lastModified();
-
- final Bucket bucket = new Bucket();
- bucket.setIdentifier(bucketIdentifier);
- bucket.setName("Bucket '" + bucketIdentifier + "'");
- bucket.setCreatedTimestamp(creation);
-
- buckets.add(bucket);
- }
-
- return buckets;
- }
-
- @Override
- public synchronized VersionedFlow registerVersionedFlow(final VersionedFlow flow) throws IOException, UnknownResourceException {
- Objects.requireNonNull(flow);
- Objects.requireNonNull(flow.getBucketIdentifier());
- Objects.requireNonNull(flow.getName());
-
- // Verify that bucket exists
- final File bucketDir = new File(directory, flow.getBucketIdentifier());
- if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
- }
-
- // Verify that there is no flow with the same name in that bucket
- final Set<String> flowNames = flowNamesByBucket.get(flow.getBucketIdentifier());
- if (flowNames != null && flowNames.contains(flow.getName())) {
- throw new IllegalArgumentException("Flow with name '" + flow.getName() + "' already exists for Bucket with ID " + flow.getBucketIdentifier());
- }
-
- final String flowIdentifier = UUID.randomUUID().toString();
- final File flowDir = new File(bucketDir, flowIdentifier);
- if (!flowDir.mkdirs()) {
- throw new IOException("Failed to create directory " + flowDir + " for new Flow");
- }
-
- final File propertiesFile = new File(flowDir, "flow.properties");
-
- final Properties flowProperties = new Properties();
- flowProperties.setProperty("name", flow.getName());
- flowProperties.setProperty("created", String.valueOf(flow.getCreatedTimestamp()));
- flowProperties.setProperty("description", flow.getDescription());
- flowProperties.setProperty("lastModified", String.valueOf(flow.getModifiedTimestamp()));
-
- try (final OutputStream out = new FileOutputStream(propertiesFile)) {
- flowProperties.store(out, null);
- }
-
- final VersionedFlow response = new VersionedFlow();
- response.setBucketIdentifier(flow.getBucketIdentifier());
- response.setCreatedTimestamp(flow.getCreatedTimestamp());
- response.setDescription(flow.getDescription());
- response.setIdentifier(flowIdentifier);
- response.setModifiedTimestamp(flow.getModifiedTimestamp());
- response.setName(flow.getName());
-
- return response;
- }
-
- @Override
- public synchronized VersionedFlowSnapshot registerVersionedFlowSnapshot(final VersionedFlow flow, final VersionedProcessGroup snapshot, final String comments)
- throws IOException, UnknownResourceException {
- Objects.requireNonNull(flow);
- Objects.requireNonNull(flow.getBucketIdentifier());
- Objects.requireNonNull(flow.getName());
- Objects.requireNonNull(snapshot);
-
- // Verify that the bucket exists
- final File bucketDir = new File(directory, flow.getBucketIdentifier());
- if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + flow.getBucketIdentifier());
- }
-
- // Verify that the flow exists
- final File flowDir = new File(bucketDir, flow.getIdentifier());
- if (!flowDir.exists()) {
- throw new UnknownResourceException("No Flow with ID " + flow.getIdentifier() + " exists for Bucket with ID " + flow.getBucketIdentifier());
- }
-
- final File[] versionDirs = flowDir.listFiles();
- if (versionDirs == null) {
- throw new IOException("Unable to perform listing of directory " + flowDir);
- }
-
- int maxVersion = 0;
- for (final File versionDir : versionDirs) {
- final String versionName = versionDir.getName();
-
- final int version;
- try {
- version = Integer.parseInt(versionName);
- } catch (final NumberFormatException nfe) {
- continue;
- }
-
- if (version > maxVersion) {
- maxVersion = version;
- }
- }
-
- final int snapshotVersion = maxVersion + 1;
- final File snapshotDir = new File(flowDir, String.valueOf(snapshotVersion));
- if (!snapshotDir.mkdir()) {
- throw new IOException("Could not create directory " + snapshotDir);
- }
-
- final File contentsFile = new File(snapshotDir, "flow.xml");
-
- try (final OutputStream out = new FileOutputStream(contentsFile);
- final JsonGenerator generator = jsonFactory.createJsonGenerator(out)) {
- generator.setCodec(new ObjectMapper());
- generator.setPrettyPrinter(new DefaultPrettyPrinter());
- generator.writeObject(snapshot);
- }
-
- final Properties snapshotProperties = new Properties();
- snapshotProperties.setProperty("comments", comments);
- snapshotProperties.setProperty("name", flow.getName());
- final File snapshotPropsFile = new File(snapshotDir, "snapshot.properties");
- try (final OutputStream out = new FileOutputStream(snapshotPropsFile)) {
- snapshotProperties.store(out, null);
- }
-
- final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
- snapshotMetadata.setBucketIdentifier(flow.getBucketIdentifier());
- snapshotMetadata.setComments(comments);
- snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
- snapshotMetadata.setFlowName(flow.getName());
- snapshotMetadata.setTimestamp(System.currentTimeMillis());
- snapshotMetadata.setVersion(snapshotVersion);
-
- final VersionedFlowSnapshot response = new VersionedFlowSnapshot();
- response.setSnapshotMetadata(snapshotMetadata);
- response.setFlowContents(snapshot);
- return response;
- }
-
- @Override
- public Set<String> getRegistryIdentifiers() {
- return Collections.singleton("default");
- }
-
- @Override
- public int getLatestVersion(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
- // Verify that the bucket exists
- final File bucketDir = new File(directory, bucketId);
- if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + bucketId);
- }
-
- // Verify that the flow exists
- final File flowDir = new File(bucketDir, flowId);
- if (!flowDir.exists()) {
- throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + bucketId);
- }
-
- final File[] versionDirs = flowDir.listFiles();
- if (versionDirs == null) {
- throw new IOException("Unable to perform listing of directory " + flowDir);
- }
-
- int maxVersion = 0;
- for (final File versionDir : versionDirs) {
- final String versionName = versionDir.getName();
-
- final int version;
- try {
- version = Integer.parseInt(versionName);
- } catch (final NumberFormatException nfe) {
- continue;
- }
-
- if (version > maxVersion) {
- maxVersion = version;
- }
- }
-
- return maxVersion;
- }
-
- @Override
- public VersionedFlowSnapshot getFlowContents(final String bucketId, final String flowId, int version) throws IOException, UnknownResourceException {
- // Verify that the bucket exists
- final File bucketDir = new File(directory, bucketId);
- if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + bucketId);
- }
-
- // Verify that the flow exists
- final File flowDir = new File(bucketDir, flowId);
- if (!flowDir.exists()) {
- throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
- }
-
- final File versionDir = new File(flowDir, String.valueOf(version));
- if (!versionDir.exists()) {
- throw new UnknownResourceException("Flow with ID " + flowId + " in Bucket with ID " + bucketId + " does not contain a snapshot with version " + version);
- }
-
- final File contentsFile = new File(versionDir, "flow.xml");
-
- final VersionedProcessGroup processGroup;
- try (final JsonParser parser = jsonFactory.createParser(contentsFile)) {
- final ObjectMapper mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
- parser.setCodec(mapper);
- processGroup = parser.readValueAs(VersionedProcessGroup.class);
- }
-
- final Properties properties = new Properties();
- final File snapshotPropsFile = new File(versionDir, "snapshot.properties");
- try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
- properties.load(in);
- }
-
- final String comments = properties.getProperty("comments");
- final String flowName = properties.getProperty("name");
-
- final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
- snapshotMetadata.setBucketIdentifier(bucketId);
- snapshotMetadata.setComments(comments);
- snapshotMetadata.setFlowIdentifier(flowId);
- snapshotMetadata.setFlowName(flowName);
- snapshotMetadata.setTimestamp(System.currentTimeMillis());
- snapshotMetadata.setVersion(version);
-
- final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
- snapshot.setFlowContents(processGroup);
- snapshot.setSnapshotMetadata(snapshotMetadata);
-
- return snapshot;
- }
-
- @Override
- public VersionedFlow getVersionedFlow(final String bucketId, final String flowId) throws IOException, UnknownResourceException {
- // Verify that the bucket exists
- final File bucketDir = new File(directory, bucketId);
- if (!bucketDir.exists()) {
- throw new UnknownResourceException("No bucket exists with ID " + bucketId);
- }
-
- // Verify that the flow exists
- final File flowDir = new File(bucketDir, flowId);
- if (!flowDir.exists()) {
- throw new UnknownResourceException("No Flow with ID " + flowId + " exists for Bucket with ID " + flowId);
- }
-
- final File flowPropsFile = new File(flowDir, "flow.properties");
- final Properties flowProperties = new Properties();
- try (final InputStream in = new FileInputStream(flowPropsFile)) {
- flowProperties.load(in);
- }
-
- final VersionedFlow flow = new VersionedFlow();
- flow.setBucketIdentifier(bucketId);
- flow.setCreatedTimestamp(Long.parseLong(flowProperties.getProperty("created")));
- flow.setDescription(flowProperties.getProperty("description"));
- flow.setIdentifier(flowId);
- flow.setModifiedTimestamp(flowDir.lastModified());
- flow.setName(flowProperties.getProperty("name"));
-
- final Comparator<VersionedFlowSnapshotMetadata> versionComparator = (a, b) -> Integer.compare(a.getVersion(), b.getVersion());
-
- final SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadataSet = new TreeSet<>(versionComparator);
- flow.setSnapshotMetadata(snapshotMetadataSet);
-
- final File[] versionDirs = flowDir.listFiles();
- for (final File file : versionDirs) {
- if (!file.isDirectory()) {
- continue;
- }
-
- int version;
- try {
- version = Integer.parseInt(file.getName());
- } catch (final NumberFormatException nfe) {
- // not a version. skip.
- continue;
- }
-
- final File snapshotPropsFile = new File(file, "snapshot.properties");
- final Properties snapshotProperties = new Properties();
- try (final InputStream in = new FileInputStream(snapshotPropsFile)) {
- snapshotProperties.load(in);
- }
-
- final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
- metadata.setBucketIdentifier(bucketId);
- metadata.setComments(snapshotProperties.getProperty("comments"));
- metadata.setFlowIdentifier(flowId);
- metadata.setFlowName(snapshotProperties.getProperty("name"));
- metadata.setTimestamp(file.lastModified());
- metadata.setVersion(version);
-
- snapshotMetadataSet.add(metadata);
- }
-
- return flow;
- }
-}