You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/08/22 19:50:58 UTC
[2/5] nifi-registry git commit: NIFIREG-7 Defining Provider API and
framework for loading providers - Renaming nifi-registry-flow-data-model to
nifi-registry-data-model - Implementing FileSystemFlowProvider &
FileSystemMetadataProvider - Adding unit test
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
new file mode 100644
index 0000000..8f186fd
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Standard configuration context to be passed to onConfigured method of Providers.
+ */
+public class StandardProviderConfigurationContext implements ProviderConfigurationContext {
+
+ private final Map<String,String> properties;
+
+ public StandardProviderConfigurationContext(final Map<String, String> properties) {
+ this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
new file mode 100644
index 0000000..1a83d68
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.apache.nifi.registry.provider.generated.Property;
+import org.apache.nifi.registry.provider.generated.Providers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Standard implementation of ProviderFactory.
+ */
+public class StandardProviderFactory implements ProviderFactory {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StandardProviderFactory.class);
+
+ private static final String PROVIDERS_XSD = "/providers.xsd";
+ private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.provider.generated";
+ private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+ /**
+ * Load the JAXBContext.
+ */
+ private static JAXBContext initializeJaxbContext() {
+ try {
+ return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.");
+ }
+ }
+
+ private final NiFiRegistryProperties properties;
+ private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null);
+
+ private FlowPersistenceProvider flowPersistenceProvider;
+ private MetadataProvider metadataProvider;
+
+ public StandardProviderFactory(final NiFiRegistryProperties properties) {
+ this.properties = properties;
+
+ if (this.properties == null) {
+ throw new IllegalStateException("NiFiRegistryProperties cannot be null");
+ }
+ }
+
+ @Override
+ public synchronized void initialize() throws ProviderFactoryException {
+ if (providersHolder.get() == null) {
+ final File providersConfigFile = properties.getProvidersConfigurationFile();
+ if (providersConfigFile.exists()) {
+ try {
+ // find the schema
+ final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ final Schema schema = schemaFactory.newSchema(StandardProviderFactory.class.getResource(PROVIDERS_XSD));
+
+ // attempt to unmarshal
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ unmarshaller.setSchema(schema);
+
+ // set the holder for later use
+ final JAXBElement<Providers> element = unmarshaller.unmarshal(new StreamSource(providersConfigFile), Providers.class);
+ providersHolder.set(element.getValue());
+ } catch (SAXException | JAXBException e) {
+ throw new ProviderFactoryException("Unable to load the providers configuration file at: " + providersConfigFile.getAbsolutePath(), e);
+ }
+ } else {
+ throw new ProviderFactoryException("Unable to find the providers configuration file at " + providersConfigFile.getAbsolutePath());
+ }
+ }
+ }
+
+ @Override
+ public synchronized MetadataProvider getMetadataProvider() {
+ if (metadataProvider == null) {
+ if (providersHolder.get() == null) {
+ throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
+ }
+
+ final Providers providers = providersHolder.get();
+ final org.apache.nifi.registry.provider.generated.Provider jaxbMetadataProvider = providers.getMetadataProvider();
+ final String metadataProviderClassName = jaxbMetadataProvider.getClazz();
+
+ try {
+ final Class<?> rawMetadataProviderClass = Class.forName(metadataProviderClassName, true, StandardProviderFactory.class.getClassLoader());
+ final Class<? extends MetadataProvider> metadataProviderClass = rawMetadataProviderClass.asSubclass(MetadataProvider.class);
+
+ // otherwise create a new instance
+ final Constructor constructor = metadataProviderClass.getConstructor();
+ metadataProvider = (MetadataProvider) constructor.newInstance();
+
+ LOGGER.info("Instantiated MetadataProvider with class name {}", new Object[] {metadataProviderClassName});
+ } catch (Exception e) {
+ throw new ProviderFactoryException("Error creating MetadataProvider with class name: " + metadataProviderClassName, e);
+ }
+
+ final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbMetadataProvider.getProperty());
+ metadataProvider.onConfigured(configurationContext);
+ LOGGER.info("Configured MetadataProvider with class name {}", new Object[] {metadataProviderClassName});
+ }
+
+ return metadataProvider;
+ }
+
+ @Override
+ public synchronized FlowPersistenceProvider getFlowPersistenceProvider() {
+ if (flowPersistenceProvider == null) {
+ if (providersHolder.get() == null) {
+ throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
+ }
+
+ final Providers providers = providersHolder.get();
+ final org.apache.nifi.registry.provider.generated.Provider jaxbFlowProvider = providers.getFlowPersistenceProvider();
+ final String flowProviderClassName = jaxbFlowProvider.getClazz();
+
+ try {
+ final Class<?> rawFlowProviderClass = Class.forName(flowProviderClassName, true, StandardProviderFactory.class.getClassLoader());
+ final Class<? extends FlowPersistenceProvider> flowProviderClass = rawFlowProviderClass.asSubclass(FlowPersistenceProvider.class);
+
+ final Constructor constructor = flowProviderClass.getConstructor();
+ flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance();
+
+ LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+ } catch (Exception e) {
+ throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e);
+ }
+
+ final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty());
+ flowPersistenceProvider.onConfigured(configurationContext);
+ LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+ }
+
+ return flowPersistenceProvider;
+ }
+
+ private ProviderConfigurationContext createConfigurationContext(final List<Property> configProperties) {
+ final Map<String,String> properties = new HashMap<>();
+
+ if (configProperties != null) {
+ configProperties.stream().forEach(p -> properties.put(p.getName(), p.getValue()));
+ }
+
+ return new StandardProviderConfigurationContext(properties);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/xsd/providers.xsd
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/xsd/providers.xsd b/nifi-registry-framework/src/main/xsd/providers.xsd
new file mode 100644
index 0000000..cb71ed8
--- /dev/null
+++ b/nifi-registry-framework/src/main/xsd/providers.xsd
@@ -0,0 +1,51 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
+
+ <!-- Provider type -->
+ <xs:complexType name="Provider">
+ <xs:sequence>
+ <xs:element name="class" type="NonEmptyStringType"/>
+ <xs:element name="property" type="Property" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <!-- Name/Value properties-->
+ <xs:complexType name="Property">
+ <xs:simpleContent>
+ <xs:extension base="xs:string">
+ <xs:attribute name="name" type="NonEmptyStringType"></xs:attribute>
+ </xs:extension>
+ </xs:simpleContent>
+ </xs:complexType>
+
+ <xs:simpleType name="NonEmptyStringType">
+ <xs:restriction base="xs:string">
+ <xs:minLength value="1"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <!-- providers -->
+ <xs:element name="providers">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="metadataProvider" type="Provider" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="flowPersistenceProvider" type="Provider" minOccurs="1" maxOccurs="1" />
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+
+</xs:schema>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java
new file mode 100644
index 0000000..e0c7f16
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStandardFlowSnapshotContext {
+
+ @Test
+ public void testBuilder() {
+ final String bucketId = "1234-1234-1234-1234";
+ final String bucketName = "Some Bucket";
+ final String flowId = "2345-2345-2345-2345";
+ final String flowName = "Some Flow";
+ final int version = 2;
+ final String comments = "Some Comments";
+ final long timestamp = System.currentTimeMillis();
+
+ final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder()
+ .bucketId(bucketId)
+ .bucketName(bucketName)
+ .flowId(flowId)
+ .flowName(flowName)
+ .version(version)
+ .comments(comments)
+ .snapshotTimestamp(timestamp)
+ .build();
+
+ Assert.assertEquals(bucketId, context.getBucketId());
+ Assert.assertEquals(bucketName, context.getBucketName());
+ Assert.assertEquals(flowId, context.getFlowId());
+ Assert.assertEquals(flowName, context.getFlowName());
+ Assert.assertEquals(version, context.getVersion());
+ Assert.assertEquals(comments, context.getComments());
+ Assert.assertEquals(timestamp, context.getSnapshotTimestamp());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java
new file mode 100644
index 0000000..2eed54f
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+
+import java.util.Map;
+
+public class MockFlowPersistenceProvider implements FlowPersistenceProvider {
+
+ private Map<String,String> properties;
+
+ @Override
+ public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ properties = configurationContext.getProperties();
+ }
+
+ @Override
+ public void saveSnapshot(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException {
+
+ }
+
+ @Override
+ public byte[] getSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException {
+ return new byte[0];
+ }
+
+ @Override
+ public void deleteSnapshots(String bucketId, String flowId) throws FlowPersistenceException {
+
+ }
+
+ @Override
+ public void deleteSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException {
+
+ }
+
+ public Map<String,String> getProperties() {
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
new file mode 100644
index 0000000..06d3e1a
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+
+import java.util.Map;
+import java.util.Set;
+
+public class MockMetadataProvider implements MetadataProvider {
+
+ private Map<String,String> properties;
+
+ @Override
+ public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ this.properties = configurationContext.getProperties();
+ }
+
+ public Map<String,String> getProperties() {
+ return properties;
+ }
+
+ @Override
+ public Bucket createBucket(Bucket bucket) {
+ return null;
+ }
+
+ @Override
+ public Bucket getBucket(String bucketIdentifier) {
+ return null;
+ }
+
+ @Override
+ public Set<Bucket> getBuckets() {
+ return null;
+ }
+
+ @Override
+ public Bucket updateBucket(Bucket bucket) {
+ return null;
+ }
+
+ @Override
+ public void deleteBucket(String bucketIdentifier) {
+
+ }
+
+ @Override
+ public VersionedFlow createFlow(String bucketIdentifier, VersionedFlow flow) {
+ return null;
+ }
+
+ @Override
+ public VersionedFlow getFlow(String flowIdentifier) {
+ return null;
+ }
+
+ @Override
+ public Set<VersionedFlow> getFlows() {
+ return null;
+ }
+
+ @Override
+ public Set<VersionedFlow> getFlows(String bucketId) {
+ return null;
+ }
+
+ @Override
+ public VersionedFlow updateFlow(VersionedFlow versionedFlow) {
+ return null;
+ }
+
+ @Override
+ public void deleteFlow(String flowIdentifier) {
+
+ }
+
+ @Override
+ public VersionedFlowSnapshot createFlowSnapshot(VersionedFlowSnapshot flowSnapshot) {
+ return null;
+ }
+
+ @Override
+ public VersionedFlowSnapshot getFlowSnapshot(String flowIdentifier, Integer version) {
+ return null;
+ }
+
+ @Override
+ public void deleteFlowSnapshot(String flowIdentifier, Integer version) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
new file mode 100644
index 0000000..2bec5ba
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestStandardProviderFactory {
+
+ @Test
+ public void testGetProvidersSuccess() {
+ final NiFiRegistryProperties props = new NiFiRegistryProperties();
+ props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml");
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props);
+ providerFactory.initialize();
+
+ final MetadataProvider metadataProvider = providerFactory.getMetadataProvider();
+ assertNotNull(metadataProvider);
+ assertTrue(metadataProvider instanceof MockMetadataProvider);
+
+ final MockMetadataProvider mockMetadataProvider = (MockMetadataProvider) metadataProvider;
+ assertNotNull(mockMetadataProvider.getProperties());
+ assertEquals("metadata foo", mockMetadataProvider.getProperties().get("Metadata Property 1"));
+ assertEquals("metadata bar", mockMetadataProvider.getProperties().get("Metadata Property 2"));
+
+ final FlowPersistenceProvider flowPersistenceProvider = providerFactory.getFlowPersistenceProvider();
+ assertNotNull(flowPersistenceProvider);
+
+ final MockFlowPersistenceProvider mockFlowProvider = (MockFlowPersistenceProvider) flowPersistenceProvider;
+ assertNotNull(mockFlowProvider.getProperties());
+ assertEquals("flow foo", mockFlowProvider.getProperties().get("Flow Property 1"));
+ assertEquals("flow bar", mockFlowProvider.getProperties().get("Flow Property 2"));
+ }
+
+ @Test(expected = ProviderFactoryException.class)
+ public void testGetMetadataProviderBeforeInitializingShouldThrowException() {
+ final NiFiRegistryProperties props = new NiFiRegistryProperties();
+ props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml");
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props);
+ providerFactory.getMetadataProvider();
+ }
+
+ @Test(expected = ProviderFactoryException.class)
+ public void testGetFlowProviderBeforeInitializingShouldThrowException() {
+ final NiFiRegistryProperties props = new NiFiRegistryProperties();
+ props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml");
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props);
+ providerFactory.getFlowPersistenceProvider();
+ }
+
+ @Test(expected = ProviderFactoryException.class)
+ public void testProvidersConfigDoesNotExist() {
+ final NiFiRegistryProperties props = new NiFiRegistryProperties();
+ props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-does-not-exist.xml");
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props);
+ providerFactory.initialize();
+ }
+
+ @Test(expected = ProviderFactoryException.class)
+ public void testMetadataProviderClassNotFound() {
+ final NiFiRegistryProperties props = new NiFiRegistryProperties();
+ props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-class-not-found.xml");
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props);
+ providerFactory.initialize();
+
+ providerFactory.getMetadataProvider();
+ }
+
+ @Test(expected = ProviderFactoryException.class)
+ public void testFlowProviderClassNotFound() {
+ final NiFiRegistryProperties props = new NiFiRegistryProperties();
+ props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-class-not-found.xml");
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props);
+ providerFactory.initialize();
+
+ providerFactory.getFlowPersistenceProvider();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml b/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml
new file mode 100644
index 0000000..8b5debe
--- /dev/null
+++ b/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<providers>
+
+ <metadataProvider>
+ <class>org.apache.nifi.registry.provider.MetadataProviderXXX</class>
+ <property name="Metadata Property 1">foo</property>
+ <property name="Metadata Property 2">bar</property>
+ </metadataProvider>
+
+ <flowPersistenceProvider>
+ <class>org.apache.nifi.registry.provider.FlowProviderXXX</class>
+ <property name="Flow Property 1">foo</property>
+ <property name="Flow Property 2">bar</property>
+ </flowPersistenceProvider>
+
+</providers>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/resources/provider/providers-good.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/resources/provider/providers-good.xml b/nifi-registry-framework/src/test/resources/provider/providers-good.xml
new file mode 100644
index 0000000..4ef2a06
--- /dev/null
+++ b/nifi-registry-framework/src/test/resources/provider/providers-good.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<providers>
+
+ <metadataProvider>
+ <class>org.apache.nifi.registry.provider.MockMetadataProvider</class>
+ <property name="Metadata Property 1">metadata foo</property>
+ <property name="Metadata Property 2">metadata bar</property>
+ </metadataProvider>
+
+ <flowPersistenceProvider>
+ <class>org.apache.nifi.registry.provider.MockFlowPersistenceProvider</class>
+ <property name="Flow Property 1">flow foo</property>
+ <property name="Flow Property 2">flow bar</property>
+ </flowPersistenceProvider>
+
+</providers>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java
----------------------------------------------------------------------
diff --git a/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java b/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java
index 5efea84..35a1b48 100644
--- a/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java
+++ b/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java
@@ -45,9 +45,12 @@ public class NiFiRegistryProperties extends Properties {
public static final String SECURITY_NEED_CLIENT_AUTH = "nifi.registry.security.needClientAuth";
public static final String SECURITY_AUTHORIZED_USERS = "nifi.registry.security.authorized.users";
+ public static final String PROVIDERS_CONFIGURATION_FILE = "nifi.registry.providers.configuration.file";
+
// Defaults
public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty";
public static final String DEFAULT_WAR_DIR = "./lib";
+ public static final String DEFAULT_PROVIDERS_CONFIGURATION_FILE = "./conf/providers.xml";
public int getWebThreads() {
int webThreads = 200;
@@ -144,4 +147,12 @@ public class NiFiRegistryProperties extends Properties {
return new File(authorizedUsersFile);
}
+ public File getProvidersConfigurationFile() {
+ final String value = getProperty(PROVIDERS_CONFIGURATION_FILE);
+ if (StringUtils.isBlank(value)) {
+ return new File(DEFAULT_PROVIDERS_CONFIGURATION_FILE);
+ } else {
+ return new File(value);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/pom.xml b/nifi-registry-provider-api/pom.xml
new file mode 100644
index 0000000..eba6cfa
--- /dev/null
+++ b/nifi-registry-provider-api/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>nifi-registry-provider-api</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-data-model</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java
new file mode 100644
index 0000000..4287fc8
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+/**
+ * An Exception for errors encountered when a FlowPersistenceProvider saves or retrieves a flow.
+ */
+public class FlowPersistenceException extends RuntimeException {
+
+ public FlowPersistenceException(String message) {
+ super(message);
+ }
+
+ public FlowPersistenceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java
new file mode 100644
index 0000000..8648722
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.registry.provider.Provider;
+
+/**
+ * A service that can store and retrieve versioned flow snapshots.
+ *
+ * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may
+ * change across releases until the registry matures.
+ */
+public interface FlowPersistenceProvider extends Provider {
+
+ /**
+ * Persists a serialized versioned flow snapshot.
+ *
+ * @param context the context for the snapshot being persisted
+ * @param content the serialized snapshot to persist
+ * @throws FlowPersistenceException if the snapshot could not be persisted
+ */
+ void saveSnapshot(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException;
+
+ /**
+ * Retrieves a versioned flow snapshot.
+ *
+ * @param bucketId the bucket id where the snapshot is located
+ * @param flowId the id of the versioned flow the snapshot belongs to
+ * @param version the version of the snapshot
+ * @return the bytes for the requested snapshot, or null if not found
+ * @throws FlowPersistenceException if the snapshot could not be retrieved due to an error in underlying provider
+ */
+ byte[] getSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException;
+
+ /**
+ * Deletes all snapshots for the versioned flow with the given id.
+ *
+ * @param bucketId the bucket the versioned flow belongs to
+ * @param flowId the id of the versioned flow
+ * @throws FlowPersistenceException if the snapshots could not be deleted due to an error in underlying provider
+ */
+ void deleteSnapshots(String bucketId, String flowId) throws FlowPersistenceException;
+
+ /**
+ * Deletes the given snapshot.
+ *
+ * @param bucketId the bucket id where the snapshot is located
+ * @param flowId the id of the versioned flow the snapshot belongs to
+ * @param version the version of the snapshot
+ * @throws FlowPersistenceException if the snapshot could not be deleted due to an error in underlying provider
+ */
+ void deleteSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException;
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
new file mode 100644
index 0000000..c5e06f5
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+/**
+ * The context that will be passed to the flow provider when saving a snapshot of a versioned flow.
+ */
+public interface FlowSnapshotContext {
+
+ /**
+ * @return the id of the bucket this snapshot belongs to
+ */
+ String getBucketId();
+
+ /**
+ * @return the name of the bucket this snapshot belongs to
+ */
+ String getBucketName();
+
+ /**
+ * @return the id of the versioned flow this snapshot belongs to
+ */
+ String getFlowId();
+
+ /**
+ * @return the name of the versioned flow this snapshot belongs to
+ */
+ String getFlowName();
+
+ /**
+ * @return the version of the snapshot
+ */
+ int getVersion();
+
+ /**
+ * @return the comments for the snapshot
+ */
+ String getComments();
+
+ /**
+ * @return the timestamp the snapshot was created
+ */
+ long getSnapshotTimestamp();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
new file mode 100644
index 0000000..558f7aa
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.provider.Provider;
+
+import java.util.Set;
+
+/**
+ * A service for managing metadata about all objects stored by the registry.
+ *
+ * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may
+ * change across releases until the registry matures.
+ */
+public interface MetadataProvider extends Provider {
+
+ /**
+ * Creates the given bucket.
+ *
+ * @param bucket the bucket to create
+ * @return the created bucket
+ */
+ Bucket createBucket(Bucket bucket);
+
+ /**
+ * Retrieves the bucket with the given id.
+ *
+ * @param bucketIdentifier the id of the bucket to retrieve
+ * @return the bucket with the given id, or null if it does not exist
+ */
+ Bucket getBucket(String bucketIdentifier);
+
+ /**
+ * Updates the given bucket, only the name and description should be allowed to be updated.
+ *
+ * @param bucket the updated bucket to save
+ * @return the updated bucket, or null if no bucket with the given id exists
+ */
+ Bucket updateBucket(Bucket bucket);
+
+ /**
+ * Deletes the bucket with the given identifier if one exists.
+ *
+ * @param bucketIdentifier the id of the bucket to delete
+ */
+ void deleteBucket(String bucketIdentifier);
+
+ /**
+ * Retrieves all buckets known to this metadata provider.
+ *
+ * @return the set of all buckets
+ */
+ Set<Bucket> getBuckets();
+
+ /**
+ * Creates a versioned flow in the given bucket.
+ *
+ * @param bucketIdentifier the id of the bucket where the flow is being created
+ * @param flow the versioned flow to create
+ * @return the created versioned flow
+ * @throws IllegalStateException if no bucket with the given identifier exists
+ */
+ VersionedFlow createFlow(String bucketIdentifier, VersionedFlow flow);
+
+ /**
+ * Retrieves the versioned flow with the given id.
+ *
+ * @param flowIdentifier the identifier of the flow to retrieve
+ * @return the versioned flow with the given id, or null if no flow with the given id exists
+ */
+ VersionedFlow getFlow(String flowIdentifier);
+
+ /**
+ * Updates the given versioned flow, only the name and description should be allowed to be updated.
+ *
+ * @param versionedFlow the updated versioned flow to save
+ * @return the updated versioned flow
+ */
+ VersionedFlow updateFlow(VersionedFlow versionedFlow);
+
+ /**
+ * Deletes the versioned flow with the given identifier if one exists.
+ *
+ * @param flowIdentifier the id of the versioned flow to delete
+ */
+ void deleteFlow(String flowIdentifier);
+
+ /**
+ * Retrieves all versioned flows known to this metadata provider.
+ *
+ * @return the set of all versioned flows
+ */
+ Set<VersionedFlow> getFlows();
+
+ /**
+ * Retrieves all the versioned flows for the given bucket.
+ *
+ * @param bucketId the id of the bucket to retrieve flow for
+ * @return the set of versioned flows for the given bucket, or an empty set if none exist
+ */
+ Set<VersionedFlow> getFlows(String bucketId);
+
+ /**
+ * Creates a versioned flow snapshot.
+ *
+ * @param flowSnapshot the snapshot to create
+ * @return the created snapshot
+ * @throws IllegalStateException if the versioned flow specified by flowSnapshot.getFlowIdentifier() does not exist
+ */
+ VersionedFlowSnapshot createFlowSnapshot(VersionedFlowSnapshot flowSnapshot);
+
+ /**
+ * Retrieves the snapshot for the given flow identifier and snapshot version.
+ *
+ * @param flowIdentifier the identifier of the flow the snapshot belongs to
+ * @param version the version of the snapshot
+ * @return the versioned flow snapshot for the given flow identifier and version, or null if none exists
+ */
+ VersionedFlowSnapshot getFlowSnapshot(String flowIdentifier, Integer version);
+
+ /**
+ * Deletes the snapshot for the given flow identifier and version.
+ *
+ * @param flowIdentifier the identifier of the flow the snapshot belongs to
+ * @param version the version of the snapshot
+ */
+ void deleteFlowSnapshot(String flowIdentifier, Integer version);
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java
new file mode 100644
index 0000000..bce9352
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+/**
+ * An exception thrown when an error is encountered by a MetadataProvider.
+ */
+public class MetadataProviderException extends RuntimeException {
+
+ public MetadataProviderException(String message) {
+ super(message);
+ }
+
+ public MetadataProviderException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public MetadataProviderException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
new file mode 100644
index 0000000..4a4be28
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+/**
+ * Base interface for providers.
+ */
+public interface Provider {
+
+ /**
+ * Called to configure the Provider.
+ *
+ * @param configurationContext the context containing configuration for the given provider
+ * @throws ProviderCreationException if an error occurs while the provider is configured
+ */
+ void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException;
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java
new file mode 100644
index 0000000..b4f7ed6
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.util.Map;
+
+/**
+ * A context that will passed to providers in order to obtain configuration.
+ */
+public interface ProviderConfigurationContext {
+
+ /**
+ * Retrieves all properties the provider currently understands regardless
+ * of whether a value has been set for them or not. If no value is present
+ * then its value is null and thus any registered default for the property
+ * descriptor applies.
+ *
+ * @return Map of all properties
+ */
+ Map<String, String> getProperties();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java
new file mode 100644
index 0000000..d1e106c
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+/**
+ * An exception that will be thrown if a provider can not be created.
+ */
+public class ProviderCreationException extends RuntimeException {
+
+ public ProviderCreationException() {
+ }
+
+ public ProviderCreationException(String message) {
+ super(message);
+ }
+
+ public ProviderCreationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ProviderCreationException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/pom.xml b/nifi-registry-provider-impl/pom.xml
new file mode 100644
index 0000000..f5999af
--- /dev/null
+++ b/nifi-registry-provider-impl/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>nifi-registry-provider-impl</artifactId>
+ <packaging>jar</packaging>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ <resource>
+ <directory>src/main/xsd</directory>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>jaxb2-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>current</id>
+ <goals>
+ <goal>xjc</goal>
+ </goals>
+ <configuration>
+ <packageName>org.apache.nifi.registry.metadata.generated</packageName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <excludes>**/metadata/generated/*.java,</excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-provider-api</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi.registry</groupId>
+ <artifactId>nifi-registry-utils</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.7.22</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java
new file mode 100644
index 0000000..fc14c14
--- /dev/null
+++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.apache.nifi.registry.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * A FlowPersistenceProvider that uses the local filesystem for storage.
+ */
+public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvider {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(FileSystemFlowPersistenceProvider.class);
+
+ static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
+
+ static final String SNAPSHOT_EXTENSION = ".snapshot";
+
+ private File flowStorageDir;
+
+ @Override
+ public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ final Map<String,String> props = configurationContext.getProperties();
+ if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) {
+ throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided");
+ }
+
+ final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP);
+ if (StringUtils.isBlank(flowStorageDirValue)) {
+ throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank");
+ }
+
+ try {
+ flowStorageDir = new File(flowStorageDirValue);
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir);
+ LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()});
+ } catch (IOException e) {
+ throw new ProviderCreationException(e);
+ }
+ }
+
+ @Override
+ public synchronized void saveSnapshot(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
+ final File bucketDir = new File(flowStorageDir, context.getBucketId());
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e);
+ }
+
+ final File flowDir = new File(bucketDir, context.getFlowId());
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing flow directory at " + flowDir.getAbsolutePath(), e);
+ }
+
+ final String versionString = String.valueOf(context.getVersion());
+ final File versionDir = new File(flowDir, versionString);
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e);
+ }
+
+ final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION);
+ if (versionFile.exists()) {
+ throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString);
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()});
+ }
+
+ try (final OutputStream out = new FileOutputStream(versionFile)) {
+ out.write(content);
+ out.flush();
+ } catch (Exception e) {
+ throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public synchronized byte[] getSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+ final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()});
+ }
+
+ if (!snapshotFile.exists()) {
+ return null;
+ }
+
+ try (final InputStream in = new FileInputStream(snapshotFile)){
+ return IOUtils.toByteArray(in);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error reading snapshot file: " + snapshotFile.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public synchronized void deleteSnapshots(final String bucketId, final String flowId) throws FlowPersistenceException {
+ final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId);
+ if (!flowDir.exists()) {
+ LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()});
+ return;
+ }
+
+ try {
+ org.apache.commons.io.FileUtils.cleanDirectory(flowDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error deleting snapshots at " + flowDir.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public synchronized void deleteSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+ final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+ if (!snapshotFile.exists()) {
+ LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()});
+ return;
+ }
+
+ final boolean deleted = snapshotFile.delete();
+ if (!deleted) {
+ throw new FlowPersistenceException("Unable to delete snapshot at " + snapshotFile.getAbsolutePath());
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()});
+ }
+ }
+
+ protected File getSnapshotFile(final String bucketId, final String flowId, final int version) {
+ final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION;
+ return new File(flowStorageDir, snapshotFilename);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
new file mode 100644
index 0000000..ccdc1a8
--- /dev/null
+++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.metadata.generated.Buckets;
+import org.apache.nifi.registry.metadata.generated.Flow;
+import org.apache.nifi.registry.metadata.generated.Flows;
+import org.apache.nifi.registry.metadata.generated.Metadata;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A MetadataProvider that persists metadata to the local filesystem.
+ */
+public class FileSystemMetadataProvider implements MetadataProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemMetadataProvider.class);
+
+ private static final String METADATA_XSD = "/metadata.xsd";
+ private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.metadata.generated";
+ private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+ /**
+ * Load the JAXBContext.
+ */
+ private static JAXBContext initializeJaxbContext() {
+ try {
+ return JAXBContext.newInstance(JAXB_GENERATED_PATH, FileSystemMetadataProvider.class.getClassLoader());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.");
+ }
+ }
+
+ static final String METADATA_FILE_PROP = "Metadata File";
+
+ private File metadataFile;
+ private Schema metadataSchema;
+ private final AtomicReference<MetadataHolder> metadataHolder = new AtomicReference<>(null);
+
+ public FileSystemMetadataProvider() throws ProviderCreationException {
+ try {
+ final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ metadataSchema = schemaFactory.newSchema(FileSystemMetadataProvider.class.getResource(METADATA_XSD));
+ } catch (SAXException e) {
+ throw new ProviderCreationException("Unable to create MetadataProvider due to: " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ final Map<String,String> config = configurationContext.getProperties();
+ if (!config.containsKey(METADATA_FILE_PROP)) {
+ throw new ProviderCreationException("The property " + METADATA_FILE_PROP + " must be provided");
+ }
+
+ final String metadataFileValue = config.get(METADATA_FILE_PROP);
+ if (StringUtils.isBlank(metadataFileValue)) {
+ throw new ProviderCreationException("The property " + METADATA_FILE_PROP + " cannot be null or blank");
+ }
+
+ try {
+ metadataFile = new File(metadataFileValue);
+ if (metadataFile.exists()) {
+ LOGGER.info("Loading metadata file from {}", new Object[] {metadataFile.getAbsolutePath()});
+ final Metadata metadata = unmarshallMetadata();
+ metadataHolder.set(new MetadataHolder(metadata));
+ } else {
+ LOGGER.info("Creating new metadata file at {}", new Object[] {metadataFile.getAbsolutePath()});
+
+ final Metadata metadata = new Metadata();
+ metadata.setBuckets(new Buckets());
+ metadata.setFlows(new Flows());
+
+ saveMetadata(metadata);
+ metadataHolder.set(new MetadataHolder(metadata));
+ }
+ } catch (Exception e) {
+ throw new ProviderCreationException("Unable to configure MetadataProvider due to: " + e.getMessage(), e);
+ }
+ }
+
+ private Metadata unmarshallMetadata() throws JAXBException {
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ unmarshaller.setSchema(metadataSchema);
+
+ final JAXBElement<Metadata> element = unmarshaller.unmarshal(new StreamSource(metadataFile), Metadata.class);
+ return element.getValue();
+ }
+
+ private void saveMetadata(final Metadata metadata) throws JAXBException {
+ final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+ marshaller.setSchema(metadataSchema);
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+ marshaller.marshal(metadata, metadataFile);
+ }
+
+ private synchronized void saveAndRefresh(final Metadata metadata) {
+ try {
+ saveMetadata(metadata);
+ metadataHolder.set(new MetadataHolder(metadata));
+ } catch (JAXBException e) {
+ throw new MetadataProviderException("Unable to save metadata", e);
+ }
+ }
+
+ @Override
+ public synchronized Bucket createBucket(final Bucket bucket) {
+ if (bucket == null) {
+ throw new IllegalArgumentException("Bucket cannot be null");
+ }
+
+ final org.apache.nifi.registry.metadata.generated.Bucket jaxbBucket = new org.apache.nifi.registry.metadata.generated.Bucket();
+ jaxbBucket.setIdentifier(bucket.getIdentifier());
+ jaxbBucket.setName(bucket.getName());
+ jaxbBucket.setDescription(bucket.getDescription());
+ jaxbBucket.setCreatedTimestamp(bucket.getCreatedTimestamp());
+
+ final MetadataHolder holder = metadataHolder.get();
+
+ final Metadata metadata = holder.getMetadata();
+ metadata.getBuckets().getBucket().add(jaxbBucket);
+
+ saveAndRefresh(metadata);
+ return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier());
+ }
+
+ @Override
+ public Bucket getBucket(final String bucketIdentifier) {
+ if (bucketIdentifier == null) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+ return holder.getBucketsBydId().get(bucketIdentifier);
+ }
+
+ @Override
+ public Set<Bucket> getBuckets() {
+ final MetadataHolder holder = metadataHolder.get();
+ final Map<String,Bucket> bucketsBydId = holder.getBucketsBydId();
+ return new HashSet<>(bucketsBydId.values());
+ }
+
+ @Override
+ public synchronized Bucket updateBucket(final Bucket bucket) {
+ if (bucket == null) {
+ throw new IllegalArgumentException("Bucket cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+ final Buckets buckets = holder.getMetadata().getBuckets();
+
+ final org.apache.nifi.registry.metadata.generated.Bucket jaxbBucket = buckets.getBucket().stream()
+ .filter(b -> bucket.getIdentifier().equals(b.getIdentifier()))
+ .findFirst()
+ .orElse(null);
+
+ if (jaxbBucket == null) {
+ return null;
+ }
+
+ jaxbBucket.setName(bucket.getName());
+ jaxbBucket.setDescription(bucket.getDescription());
+
+ saveAndRefresh(holder.getMetadata());
+ return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier());
+ }
+
+ @Override
+ public synchronized void deleteBucket(final String bucketIdentifier) {
+ if (bucketIdentifier == null) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+ final Flows flows = holder.getMetadata().getFlows();
+ final Buckets buckets = holder.getMetadata().getBuckets();
+
+ // first remove any flow that reference the bucket
+ boolean deletedFlow = false;
+ final Iterator<Flow> flowIterator = flows.getFlow().iterator();
+ while (flowIterator.hasNext()) {
+ final Flow flow = flowIterator.next();
+ if (flow.getBucketIdentifier().equals(bucketIdentifier)) {
+ flowIterator.remove();
+ deletedFlow = true;
+ }
+ }
+
+ // now delete the actual bucket
+ boolean deleteBucket = false;
+ final Iterator<org.apache.nifi.registry.metadata.generated.Bucket> bucketIterator = buckets.getBucket().iterator();
+ while (bucketIterator.hasNext()) {
+ final org.apache.nifi.registry.metadata.generated.Bucket bucket = bucketIterator.next();
+ if (bucket.getIdentifier().equals(bucketIdentifier)) {
+ bucketIterator.remove();
+ deleteBucket = true;
+ break;
+ }
+ }
+
+ if (deletedFlow || deleteBucket) {
+ saveAndRefresh(holder.getMetadata());
+ }
+ }
+
+ @Override
+ public synchronized VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) {
+ if (bucketIdentifier == null) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be blank");
+ }
+
+ if (versionedFlow == null) {
+ throw new IllegalArgumentException("Versioned Flow cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+
+ final Bucket bucket = holder.getBucketsBydId().get(bucketIdentifier);
+ if (bucket == null) {
+ throw new IllegalStateException("Unable to create Versioned Flow because Bucket does not exist with id " + bucketIdentifier);
+ }
+
+ final Flow jaxbFlow = new Flow();
+ jaxbFlow.setIdentifier(versionedFlow.getIdentifier());
+ jaxbFlow.setName(versionedFlow.getName());
+ jaxbFlow.setDescription(versionedFlow.getDescription());
+ jaxbFlow.setCreatedTimestamp(versionedFlow.getCreatedTimestamp());
+ jaxbFlow.setModifiedTimestamp(versionedFlow.getModifiedTimestamp());
+ jaxbFlow.setBucketIdentifier(bucketIdentifier);
+
+ final Metadata metadata = holder.getMetadata();
+ metadata.getFlows().getFlow().add(jaxbFlow);
+
+ saveAndRefresh(metadata);
+ return metadataHolder.get().getFlowsById().get(versionedFlow.getIdentifier());
+ }
+
+ @Override
+ public VersionedFlow getFlow(final String flowIdentifier) {
+ if (flowIdentifier == null) {
+ throw new IllegalArgumentException("Flow Identifier cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+ return holder.getFlowsById().get(flowIdentifier);
+ }
+
+ @Override
+ public Set<VersionedFlow> getFlows() {
+ final MetadataHolder holder = metadataHolder.get();
+ final Map<String,VersionedFlow> flowsById = holder.getFlowsById();
+ return new HashSet<>(flowsById.values());
+ }
+
+ @Override
+ public Set<VersionedFlow> getFlows(String bucketId) {
+ final MetadataHolder holder = metadataHolder.get();
+
+ final Map<String,Set<VersionedFlow>> flowsByBucket = holder.getFlowsByBucket();
+ if (flowsByBucket.containsKey(bucketId)) {
+ return new HashSet<>(flowsByBucket.get(bucketId));
+ } else {
+ return Collections.emptySet();
+ }
+ }
+
+ @Override
+ public synchronized VersionedFlow updateFlow(final VersionedFlow versionedFlow) {
+ if (versionedFlow == null) {
+ throw new IllegalArgumentException("Versioned Flow cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+ final Flows flows = holder.getMetadata().getFlows();
+
+ final Flow jaxbFlow = flows.getFlow().stream()
+ .filter(f -> versionedFlow.getIdentifier().equals(f.getIdentifier()))
+ .findFirst()
+ .orElse(null);
+
+ if (jaxbFlow == null) {
+ return null;
+ }
+
+ // TODO should we allow changing the bucket id here, if so it needs to be passed in
+ jaxbFlow.setName(versionedFlow.getName());
+ jaxbFlow.setDescription(versionedFlow.getDescription());
+ jaxbFlow.setModifiedTimestamp(System.currentTimeMillis());
+
+ saveAndRefresh(holder.getMetadata());
+ return metadataHolder.get().getFlowsById().get(versionedFlow.getIdentifier());
+ }
+
+ @Override
+ public synchronized void deleteFlow(final String flowIdentifier) {
+ if (flowIdentifier == null) {
+ throw new IllegalArgumentException("Flow Identifier cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+ final Flows flows = holder.getMetadata().getFlows();
+
+ boolean deleted = false;
+ final Iterator<Flow> flowIter = flows.getFlow().iterator();
+
+ while (flowIter.hasNext()) {
+ final Flow jaxbFlow = flowIter.next();
+ if (jaxbFlow.getIdentifier().equals(flowIdentifier)) {
+ flowIter.remove();
+ deleted = true;
+ break;
+ }
+ }
+
+ if (deleted) {
+ saveAndRefresh(holder.getMetadata());
+ }
+ }
+
+ @Override
+ public synchronized VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
+ if (flowSnapshot == null) {
+ throw new IllegalArgumentException("Versioned Flow Snapshot cannot be null");
+ }
+
+ final String flowIdentifier = flowSnapshot.getFlowIdentifier();
+ final int snapshotVersion = flowSnapshot.getVersion();
+
+ final MetadataHolder holder = metadataHolder.get();
+ final Flows flows = holder.getMetadata().getFlows();
+
+ final Flow jaxbFlow = flows.getFlow().stream()
+ .filter(f -> flowIdentifier.equals(f.getIdentifier()))
+ .findFirst()
+ .orElse(null);
+
+ if (jaxbFlow == null) {
+ throw new IllegalStateException("Unable to create snapshot because Versioned Flow does not exist for id " + flowIdentifier);
+ }
+
+ final Flow.Snapshot jaxbSnapshot = new Flow.Snapshot();
+ jaxbSnapshot.setVersion(flowSnapshot.getVersion());
+ jaxbSnapshot.setComments(flowSnapshot.getComments());
+ jaxbSnapshot.setCreatedTimestamp(flowSnapshot.getTimestamp());
+
+ jaxbFlow.getSnapshot().add(jaxbSnapshot);
+ saveAndRefresh(holder.getMetadata());
+
+ final VersionedFlow versionedFlow = metadataHolder.get().getFlowsById().get(flowIdentifier);
+ return versionedFlow.getSnapshot(snapshotVersion);
+ }
+
+ @Override
+ public VersionedFlowSnapshot getFlowSnapshot(final String flowIdentifier, final Integer version) {
+ if (flowIdentifier == null) {
+ throw new IllegalArgumentException("Flow Identifier cannot be null");
+ }
+
+ if (version == null) {
+ throw new IllegalArgumentException("Version cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+
+ final VersionedFlow versionedFlow = holder.getFlowsById().get(flowIdentifier);
+ if (versionedFlow == null) {
+ return null;
+ }
+
+ return versionedFlow.getSnapshot(version);
+ }
+
+ @Override
+ public synchronized void deleteFlowSnapshot(final String flowIdentifier, final Integer version) {
+ if (flowIdentifier == null) {
+ throw new IllegalArgumentException("Flow Identifier cannot be null");
+ }
+
+ if (version == null) {
+ throw new IllegalArgumentException("Version cannot be null");
+ }
+
+ final MetadataHolder holder = metadataHolder.get();
+ final Flows flows = holder.getMetadata().getFlows();
+
+ final Flow jaxbFlow = flows.getFlow().stream()
+ .filter(f -> flowIdentifier.equals(f.getIdentifier()))
+ .findFirst()
+ .orElse(null);
+
+ if (jaxbFlow == null) {
+ return;
+ }
+
+ boolean deletedSnapshot = false;
+ final Iterator<Flow.Snapshot> snapshotIterator = jaxbFlow.getSnapshot().iterator();
+
+ while (snapshotIterator.hasNext()) {
+ final Flow.Snapshot snapshot = snapshotIterator.next();
+ if (snapshot.getVersion().equals(version)) {
+ snapshotIterator.remove();
+ deletedSnapshot = true;
+ break;
+ }
+ }
+
+ if (deletedSnapshot) {
+ saveAndRefresh(holder.getMetadata());
+ }
+ }
+
+}