You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2017/08/22 19:50:58 UTC

[2/5] nifi-registry git commit: NIFIREG-7 Defining Provider API and framework for loading providers - Renaming nifi-registry-flow-data-model to nifi-registry-data-model - Implementing FileSystemFlowProvider & FileSystemMetadataProvider - Adding unit test

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
new file mode 100644
index 0000000..8f186fd
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Standard configuration context to be passed to onConfigured method of Providers.
+ */
+public class StandardProviderConfigurationContext implements ProviderConfigurationContext {
+
+    private final Map<String,String> properties;
+
+    public StandardProviderConfigurationContext(final Map<String, String> properties) {
+        this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
new file mode 100644
index 0000000..1a83d68
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.apache.nifi.registry.provider.generated.Property;
+import org.apache.nifi.registry.provider.generated.Providers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Standard implementation of ProviderFactory.
+ */
+public class StandardProviderFactory implements ProviderFactory {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(StandardProviderFactory.class);
+
+    private static final String PROVIDERS_XSD = "/providers.xsd";
+    private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.provider.generated";
+    private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+    /**
+     * Load the JAXBContext.
+     */
+    private static JAXBContext initializeJaxbContext() {
+        try {
+            return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.");
+        }
+    }
+
+    private final NiFiRegistryProperties properties;
+    private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null);
+
+    private FlowPersistenceProvider flowPersistenceProvider;
+    private MetadataProvider metadataProvider;
+
+    public StandardProviderFactory(final NiFiRegistryProperties properties) {
+        this.properties = properties;
+
+        if (this.properties == null) {
+            throw new IllegalStateException("NiFiRegistryProperties cannot be null");
+        }
+    }
+
+    @Override
+    public synchronized void initialize() throws ProviderFactoryException {
+        if (providersHolder.get() == null) {
+            final File providersConfigFile = properties.getProvidersConfigurationFile();
+            if (providersConfigFile.exists()) {
+                try {
+                    // find the schema
+                    final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+                    final Schema schema = schemaFactory.newSchema(StandardProviderFactory.class.getResource(PROVIDERS_XSD));
+
+                    // attempt to unmarshal
+                    final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+                    unmarshaller.setSchema(schema);
+
+                    // set the holder for later use
+                    final JAXBElement<Providers> element = unmarshaller.unmarshal(new StreamSource(providersConfigFile), Providers.class);
+                    providersHolder.set(element.getValue());
+                } catch (SAXException | JAXBException e) {
+                    throw new ProviderFactoryException("Unable to load the providers configuration file at: " + providersConfigFile.getAbsolutePath(), e);
+                }
+            } else {
+                throw new ProviderFactoryException("Unable to find the providers configuration file at " + providersConfigFile.getAbsolutePath());
+            }
+        }
+    }
+
+    @Override
+    public synchronized MetadataProvider getMetadataProvider() {
+        if (metadataProvider == null) {
+            if (providersHolder.get() == null) {
+                throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
+            }
+
+            final Providers providers = providersHolder.get();
+            final org.apache.nifi.registry.provider.generated.Provider jaxbMetadataProvider = providers.getMetadataProvider();
+            final String metadataProviderClassName = jaxbMetadataProvider.getClazz();
+
+            try {
+                final Class<?> rawMetadataProviderClass = Class.forName(metadataProviderClassName, true, StandardProviderFactory.class.getClassLoader());
+                final Class<? extends MetadataProvider> metadataProviderClass = rawMetadataProviderClass.asSubclass(MetadataProvider.class);
+
+                // otherwise create a new instance
+                final Constructor constructor = metadataProviderClass.getConstructor();
+                metadataProvider = (MetadataProvider) constructor.newInstance();
+
+                LOGGER.info("Instantiated MetadataProvider with class name {}", new Object[] {metadataProviderClassName});
+            } catch (Exception e) {
+                throw new ProviderFactoryException("Error creating MetadataProvider with class name: " + metadataProviderClassName, e);
+            }
+
+            final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbMetadataProvider.getProperty());
+            metadataProvider.onConfigured(configurationContext);
+            LOGGER.info("Configured MetadataProvider with class name {}", new Object[] {metadataProviderClassName});
+        }
+
+        return metadataProvider;
+    }
+
+    @Override
+    public synchronized FlowPersistenceProvider getFlowPersistenceProvider() {
+        if (flowPersistenceProvider == null) {
+            if (providersHolder.get() == null) {
+                throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
+            }
+
+            final Providers providers = providersHolder.get();
+            final org.apache.nifi.registry.provider.generated.Provider jaxbFlowProvider = providers.getFlowPersistenceProvider();
+            final String flowProviderClassName = jaxbFlowProvider.getClazz();
+
+            try {
+                final Class<?> rawFlowProviderClass = Class.forName(flowProviderClassName, true, StandardProviderFactory.class.getClassLoader());
+                final Class<? extends FlowPersistenceProvider> flowProviderClass = rawFlowProviderClass.asSubclass(FlowPersistenceProvider.class);
+
+                final Constructor constructor = flowProviderClass.getConstructor();
+                flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance();
+
+                LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+            } catch (Exception e) {
+                throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e);
+            }
+
+            final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty());
+            flowPersistenceProvider.onConfigured(configurationContext);
+            LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+        }
+
+        return flowPersistenceProvider;
+    }
+
+    private ProviderConfigurationContext createConfigurationContext(final List<Property> configProperties) {
+        final Map<String,String> properties = new HashMap<>();
+
+        if (configProperties != null) {
+            configProperties.stream().forEach(p -> properties.put(p.getName(), p.getValue()));
+        }
+
+        return new StandardProviderConfigurationContext(properties);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/main/xsd/providers.xsd
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/xsd/providers.xsd b/nifi-registry-framework/src/main/xsd/providers.xsd
new file mode 100644
index 0000000..cb71ed8
--- /dev/null
+++ b/nifi-registry-framework/src/main/xsd/providers.xsd
@@ -0,0 +1,51 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
+
+    <!-- Provider type -->
+    <xs:complexType name="Provider">
+        <xs:sequence>
+            <xs:element name="class" type="NonEmptyStringType"/>
+            <xs:element name="property" type="Property" minOccurs="0" maxOccurs="unbounded"/>
+        </xs:sequence>
+    </xs:complexType>
+
+    <!-- Name/Value properties-->
+    <xs:complexType name="Property">
+        <xs:simpleContent>
+            <xs:extension base="xs:string">
+                <xs:attribute name="name" type="NonEmptyStringType"></xs:attribute>
+            </xs:extension>
+        </xs:simpleContent>
+    </xs:complexType>
+
+    <xs:simpleType name="NonEmptyStringType">
+        <xs:restriction base="xs:string">
+            <xs:minLength value="1"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+    <!-- providers -->
+    <xs:element name="providers">
+        <xs:complexType>
+            <xs:sequence>
+                <xs:element name="metadataProvider" type="Provider" minOccurs="1" maxOccurs="1"/>
+                <xs:element name="flowPersistenceProvider" type="Provider" minOccurs="1" maxOccurs="1" />
+            </xs:sequence>
+        </xs:complexType>
+    </xs:element>
+
+</xs:schema>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java
new file mode 100644
index 0000000..e0c7f16
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/flow/TestStandardFlowSnapshotContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStandardFlowSnapshotContext {
+
+    @Test
+    public void testBuilder() {
+        final String bucketId = "1234-1234-1234-1234";
+        final String bucketName = "Some Bucket";
+        final String flowId = "2345-2345-2345-2345";
+        final String flowName = "Some Flow";
+        final int version = 2;
+        final String comments = "Some Comments";
+        final long timestamp = System.currentTimeMillis();
+
+        final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder()
+                .bucketId(bucketId)
+                .bucketName(bucketName)
+                .flowId(flowId)
+                .flowName(flowName)
+                .version(version)
+                .comments(comments)
+                .snapshotTimestamp(timestamp)
+                .build();
+
+        Assert.assertEquals(bucketId, context.getBucketId());
+        Assert.assertEquals(bucketName, context.getBucketName());
+        Assert.assertEquals(flowId, context.getFlowId());
+        Assert.assertEquals(flowName, context.getFlowName());
+        Assert.assertEquals(version, context.getVersion());
+        Assert.assertEquals(comments, context.getComments());
+        Assert.assertEquals(timestamp, context.getSnapshotTimestamp());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java
new file mode 100644
index 0000000..2eed54f
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockFlowPersistenceProvider.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+
+import java.util.Map;
+
+public class MockFlowPersistenceProvider implements FlowPersistenceProvider {
+
+    private Map<String,String> properties;
+
+    @Override
+    public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        properties = configurationContext.getProperties();
+    }
+
+    @Override
+    public void saveSnapshot(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException {
+
+    }
+
+    @Override
+    public byte[] getSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException {
+        return new byte[0];
+    }
+
+    @Override
+    public void deleteSnapshots(String bucketId, String flowId) throws FlowPersistenceException {
+
+    }
+
+    @Override
+    public void deleteSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException {
+
+    }
+
+    public Map<String,String> getProperties() {
+        return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
new file mode 100644
index 0000000..06d3e1a
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+
+import java.util.Map;
+import java.util.Set;
+
+public class MockMetadataProvider implements MetadataProvider {
+
+    private Map<String,String> properties;
+
+    @Override
+    public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        this.properties = configurationContext.getProperties();
+    }
+
+    public Map<String,String> getProperties() {
+        return properties;
+    }
+
+    @Override
+    public Bucket createBucket(Bucket bucket) {
+        return null;
+    }
+
+    @Override
+    public Bucket getBucket(String bucketIdentifier) {
+        return null;
+    }
+
+    @Override
+    public Set<Bucket> getBuckets() {
+        return null;
+    }
+
+    @Override
+    public Bucket updateBucket(Bucket bucket) {
+        return null;
+    }
+
+    @Override
+    public void deleteBucket(String bucketIdentifier) {
+
+    }
+
+    @Override
+    public VersionedFlow createFlow(String bucketIdentifier, VersionedFlow flow) {
+        return null;
+    }
+
+    @Override
+    public VersionedFlow getFlow(String flowIdentifier) {
+        return null;
+    }
+
+    @Override
+    public Set<VersionedFlow> getFlows() {
+        return null;
+    }
+
+    @Override
+    public Set<VersionedFlow> getFlows(String bucketId) {
+        return null;
+    }
+
+    @Override
+    public VersionedFlow updateFlow(VersionedFlow versionedFlow) {
+        return null;
+    }
+
+    @Override
+    public void deleteFlow(String flowIdentifier) {
+
+    }
+
+    @Override
+    public VersionedFlowSnapshot createFlowSnapshot(VersionedFlowSnapshot flowSnapshot) {
+        return null;
+    }
+
+    @Override
+    public VersionedFlowSnapshot getFlowSnapshot(String flowIdentifier, Integer version) {
+        return null;
+    }
+
+    @Override
+    public void deleteFlowSnapshot(String flowIdentifier, Integer version) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
new file mode 100644
index 0000000..2bec5ba
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class TestStandardProviderFactory {
+
+    @Test
+    public void testGetProvidersSuccess() {
+        final NiFiRegistryProperties props = new NiFiRegistryProperties();
+        props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml");
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props);
+        providerFactory.initialize();
+
+        final MetadataProvider metadataProvider = providerFactory.getMetadataProvider();
+        assertNotNull(metadataProvider);
+        assertTrue(metadataProvider instanceof MockMetadataProvider);
+
+        final MockMetadataProvider mockMetadataProvider = (MockMetadataProvider) metadataProvider;
+        assertNotNull(mockMetadataProvider.getProperties());
+        assertEquals("metadata foo", mockMetadataProvider.getProperties().get("Metadata Property 1"));
+        assertEquals("metadata bar", mockMetadataProvider.getProperties().get("Metadata Property 2"));
+
+        final FlowPersistenceProvider flowPersistenceProvider = providerFactory.getFlowPersistenceProvider();
+        assertNotNull(flowPersistenceProvider);
+
+        final MockFlowPersistenceProvider mockFlowProvider = (MockFlowPersistenceProvider) flowPersistenceProvider;
+        assertNotNull(mockFlowProvider.getProperties());
+        assertEquals("flow foo", mockFlowProvider.getProperties().get("Flow Property 1"));
+        assertEquals("flow bar", mockFlowProvider.getProperties().get("Flow Property 2"));
+    }
+
+    @Test(expected = ProviderFactoryException.class)
+    public void testGetMetadataProviderBeforeInitializingShouldThrowException() {
+        final NiFiRegistryProperties props = new NiFiRegistryProperties();
+        props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml");
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props);
+        providerFactory.getMetadataProvider();
+    }
+
+    @Test(expected = ProviderFactoryException.class)
+    public void testGetFlowProviderBeforeInitializingShouldThrowException() {
+        final NiFiRegistryProperties props = new NiFiRegistryProperties();
+        props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-good.xml");
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props);
+        providerFactory.getFlowPersistenceProvider();
+    }
+
+    @Test(expected = ProviderFactoryException.class)
+    public void testProvidersConfigDoesNotExist() {
+        final NiFiRegistryProperties props = new NiFiRegistryProperties();
+        props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-does-not-exist.xml");
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props);
+        providerFactory.initialize();
+    }
+
+    @Test(expected = ProviderFactoryException.class)
+    public void testMetadataProviderClassNotFound() {
+        final NiFiRegistryProperties props = new NiFiRegistryProperties();
+        props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-class-not-found.xml");
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props);
+        providerFactory.initialize();
+
+        providerFactory.getMetadataProvider();
+    }
+
+    @Test(expected = ProviderFactoryException.class)
+    public void testFlowProviderClassNotFound() {
+        final NiFiRegistryProperties props = new NiFiRegistryProperties();
+        props.setProperty(NiFiRegistryProperties.PROVIDERS_CONFIGURATION_FILE, "src/test/resources/provider/providers-class-not-found.xml");
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props);
+        providerFactory.initialize();
+
+        providerFactory.getFlowPersistenceProvider();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml b/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml
new file mode 100644
index 0000000..8b5debe
--- /dev/null
+++ b/nifi-registry-framework/src/test/resources/provider/providers-class-not-found.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<providers>
+
+    <metadataProvider>
+        <class>org.apache.nifi.registry.provider.MetadataProviderXXX</class>
+        <property name="Metadata Property 1">foo</property>
+        <property name="Metadata Property 2">bar</property>
+    </metadataProvider>
+
+    <flowPersistenceProvider>
+        <class>org.apache.nifi.registry.provider.FlowProviderXXX</class>
+        <property name="Flow Property 1">foo</property>
+        <property name="Flow Property 2">bar</property>
+    </flowPersistenceProvider>
+
+</providers>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-framework/src/test/resources/provider/providers-good.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/resources/provider/providers-good.xml b/nifi-registry-framework/src/test/resources/provider/providers-good.xml
new file mode 100644
index 0000000..4ef2a06
--- /dev/null
+++ b/nifi-registry-framework/src/test/resources/provider/providers-good.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<providers>
+
+    <metadataProvider>
+        <class>org.apache.nifi.registry.provider.MockMetadataProvider</class>
+        <property name="Metadata Property 1">metadata foo</property>
+        <property name="Metadata Property 2">metadata bar</property>
+    </metadataProvider>
+
+    <flowPersistenceProvider>
+        <class>org.apache.nifi.registry.provider.MockFlowPersistenceProvider</class>
+        <property name="Flow Property 1">flow foo</property>
+        <property name="Flow Property 2">flow bar</property>
+    </flowPersistenceProvider>
+
+</providers>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java
----------------------------------------------------------------------
diff --git a/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java b/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java
index 5efea84..35a1b48 100644
--- a/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java
+++ b/nifi-registry-properties/src/main/java/org/apache/nifi/registry/properties/NiFiRegistryProperties.java
@@ -45,9 +45,12 @@ public class NiFiRegistryProperties extends Properties {
     public static final String SECURITY_NEED_CLIENT_AUTH = "nifi.registry.security.needClientAuth";
     public static final String SECURITY_AUTHORIZED_USERS = "nifi.registry.security.authorized.users";
 
+    public static final String PROVIDERS_CONFIGURATION_FILE = "nifi.registry.providers.configuration.file";
+
     // Defaults
     public static final String DEFAULT_WEB_WORKING_DIR = "./work/jetty";
     public static final String DEFAULT_WAR_DIR = "./lib";
+    public static final String DEFAULT_PROVIDERS_CONFIGURATION_FILE = "./conf/providers.xml";
 
     public int getWebThreads() {
         int webThreads = 200;
@@ -144,4 +147,12 @@ public class NiFiRegistryProperties extends Properties {
         return new File(authorizedUsersFile);
     }
 
+    public File getProvidersConfigurationFile() {
+        final String value = getProperty(PROVIDERS_CONFIGURATION_FILE);
+        if (StringUtils.isBlank(value)) {
+            return new File(DEFAULT_PROVIDERS_CONFIGURATION_FILE);
+        } else {
+            return new File(value);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/pom.xml b/nifi-registry-provider-api/pom.xml
new file mode 100644
index 0000000..eba6cfa
--- /dev/null
+++ b/nifi-registry-provider-api/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.nifi.registry</groupId>
+        <artifactId>nifi-registry</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>nifi-registry-provider-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-data-model</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java
new file mode 100644
index 0000000..4287fc8
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+/**
+ * An Exception for errors encountered when a FlowPersistenceProvider saves or retrieves a flow.
+ */
+public class FlowPersistenceException extends RuntimeException {
+
+    public FlowPersistenceException(String message) {
+        super(message);
+    }
+
+    public FlowPersistenceException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java
new file mode 100644
index 0000000..8648722
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowPersistenceProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.nifi.registry.provider.Provider;
+
+/**
+ * A service that can store and retrieve versioned flow snapshots.
+ *
+ * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may
+ * change across releases until the registry matures.
+ */
+public interface FlowPersistenceProvider extends Provider {
+
+    /**
+     * Persists a serialized versioned flow snapshot.
+     *
+     * @param context the context for the snapshot being persisted
+     * @param content the serialized snapshot to persist
+     * @throws FlowPersistenceException if the snapshot could not be persisted
+     */
+    void saveSnapshot(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException;
+
+    /**
+     * Retrieves a versioned flow snapshot.
+     *
+     * @param bucketId the bucket id where the snapshot is located
+     * @param flowId the id of the versioned flow the snapshot belongs to
+     * @param version the version of the snapshot
+     * @return the bytes for the requested snapshot, or null if not found
+     * @throws FlowPersistenceException if the snapshot could not be retrieved due to an error in underlying provider
+     */
+    byte[] getSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException;
+
+    /**
+     * Deletes all snapshots for the versioned flow with the given id.
+     *
+     * @param bucketId the bucket the versioned flow belongs to
+     * @param flowId the id of the versioned flow
+     * @throws FlowPersistenceException if the snapshots could not be deleted due to an error in underlying provider
+     */
+    void deleteSnapshots(String bucketId, String flowId) throws FlowPersistenceException;
+
+    /**
+     * Deletes the given snapshot.
+     *
+     * @param bucketId the bucket id where the snapshot is located
+     * @param flowId the id of the versioned flow the snapshot belongs to
+     * @param version the version of the snapshot
+     * @throws FlowPersistenceException if the snapshot could not be deleted due to an error in underlying provider
+     */
+    void deleteSnapshot(String bucketId, String flowId, int version) throws FlowPersistenceException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
new file mode 100644
index 0000000..c5e06f5
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/flow/FlowSnapshotContext.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+/**
+ * The context that will be passed to the flow provider when saving a snapshot of a versioned flow.
+ */
+public interface FlowSnapshotContext {
+
+    /**
+     * @return the id of the bucket this snapshot belongs to
+     */
+    String getBucketId();
+
+    /**
+     * @return the name of the bucket this snapshot belongs to
+     */
+    String getBucketName();
+
+    /**
+     * @return the id of the versioned flow this snapshot belongs to
+     */
+    String getFlowId();
+
+    /**
+     * @return the name of the versioned flow this snapshot belongs to
+     */
+    String getFlowName();
+
+    /**
+     * @return the version of the snapshot
+     */
+    int getVersion();
+
+    /**
+     * @return the comments for the snapshot
+     */
+    String getComments();
+
+    /**
+     * @return the timestamp the snapshot was created
+     */
+    long getSnapshotTimestamp();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
new file mode 100644
index 0000000..558f7aa
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.provider.Provider;
+
+import java.util.Set;
+
+/**
+ * A service for managing metadata about all objects stored by the registry.
+ *
+ * NOTE: Although this interface is intended to be an extension point, it is not yet considered stable and thus may
+ * change across releases until the registry matures.
+ */
+public interface MetadataProvider extends Provider {
+
+    /**
+     * Creates the given bucket.
+     *
+     * @param bucket the bucket to create
+     * @return the created bucket
+     */
+    Bucket createBucket(Bucket bucket);
+
+    /**
+     * Retrieves the bucket with the given id.
+     *
+     * @param bucketIdentifier the id of the bucket to retrieve
+     * @return the bucket with the given id, or null if it does not exist
+     */
+    Bucket getBucket(String bucketIdentifier);
+
+    /**
+     * Updates the given bucket, only the name and description should be allowed to be updated.
+     *
+     * @param bucket the updated bucket to save
+     * @return the updated bucket, or null if no bucket with the given id exists
+     */
+    Bucket updateBucket(Bucket bucket);
+
+    /**
+     * Deletes the bucket with the given identifier if one exists.
+     *
+     * @param bucketIdentifier the id of the bucket to delete
+     */
+    void deleteBucket(String bucketIdentifier);
+
+    /**
+     * Retrieves all buckets known to this metadata provider.
+     *
+     * @return the set of all buckets
+     */
+    Set<Bucket> getBuckets();
+
+    /**
+     * Creates a versioned flow in the given bucket.
+     *
+     * @param bucketIdentifier the id of the bucket where the flow is being created
+     * @param flow the versioned flow to create
+     * @return the created versioned flow
+     * @throws IllegalStateException if no bucket with the given identifier exists
+     */
+    VersionedFlow createFlow(String bucketIdentifier, VersionedFlow flow);
+
+    /**
+     * Retrieves the versioned flow with the given id.
+     *
+     * @param flowIdentifier the identifier of the flow to retrieve
+     * @return the versioned flow with the given id, or null if no flow with the given id exists
+     */
+    VersionedFlow getFlow(String flowIdentifier);
+
+    /**
+     * Updates the given versioned flow, only the name and description should be allowed to be updated.
+     *
+     * @param versionedFlow the updated versioned flow to save
+     * @return the updated versioned flow
+     */
+    VersionedFlow updateFlow(VersionedFlow versionedFlow);
+
+    /**
+     * Deletes the versioned flow with the given identifier if one exists.
+     *
+     * @param flowIdentifier the id of the versioned flow to delete
+     */
+    void deleteFlow(String flowIdentifier);
+
+    /**
+     * Retrieves all versioned flows known to this metadata provider.
+     *
+     * @return the set of all versioned flows
+     */
+    Set<VersionedFlow> getFlows();
+
+    /**
+     * Retrieves all the versioned flows for the given bucket.
+     *
+     * @param bucketId the id of the bucket to retrieve flow for
+     * @return the set of versioned flows for the given bucket, or an empty set if none exist
+     */
+    Set<VersionedFlow> getFlows(String bucketId);
+
+    /**
+     * Creates a versioned flow snapshot.
+     *
+     * @param flowSnapshot the snapshot to create
+     * @return the created snapshot
+     * @throws IllegalStateException if the versioned flow specified by flowSnapshot.getFlowIdentifier() does not exist
+     */
+    VersionedFlowSnapshot createFlowSnapshot(VersionedFlowSnapshot flowSnapshot);
+
+    /**
+     * Retrieves the snapshot for the given flow identifier and snapshot version.
+     *
+     * @param flowIdentifier the identifier of the flow the snapshot belongs to
+     * @param version the version of the snapshot
+     * @return the versioned flow snapshot for the given flow identifier and version, or null if none exists
+     */
+    VersionedFlowSnapshot getFlowSnapshot(String flowIdentifier, Integer version);
+
+    /**
+     * Deletes the snapshot for the given flow identifier and version.
+     *
+     * @param flowIdentifier the identifier of the flow the snapshot belongs to
+     * @param version the version of the snapshot
+     */
+    void deleteFlowSnapshot(String flowIdentifier, Integer version);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java
new file mode 100644
index 0000000..bce9352
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProviderException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+/**
+ * An exception thrown when an error is encountered by a MetadataProvider.
+ */
+public class MetadataProviderException extends RuntimeException {
+
+    public MetadataProviderException(String message) {
+        super(message);
+    }
+
+    public MetadataProviderException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MetadataProviderException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
new file mode 100644
index 0000000..4a4be28
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/Provider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+/**
+ * Base interface for providers.
+ */
+public interface Provider {
+
+    /**
+     * Called to configure the Provider.
+     *
+     * @param configurationContext the context containing configuration for the given provider
+     * @throws ProviderCreationException if an error occurs while the provider is configured
+     */
+    void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java
new file mode 100644
index 0000000..b4f7ed6
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderConfigurationContext.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.util.Map;
+
+/**
+ * A context that will passed to providers in order to obtain configuration.
+ */
+public interface ProviderConfigurationContext {
+
+    /**
+     * Retrieves all properties the provider currently understands regardless
+     * of whether a value has been set for them or not. If no value is present
+     * then its value is null and thus any registered default for the property
+     * descriptor applies.
+     *
+     * @return Map of all properties
+     */
+    Map<String, String> getProperties();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java
new file mode 100644
index 0000000..d1e106c
--- /dev/null
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderCreationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+/**
+ * An exception that will be thrown if a provider can not be created.
+ */
+public class ProviderCreationException extends RuntimeException {
+
+    public ProviderCreationException() {
+    }
+
+    public ProviderCreationException(String message) {
+        super(message);
+    }
+
+    public ProviderCreationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ProviderCreationException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/pom.xml b/nifi-registry-provider-impl/pom.xml
new file mode 100644
index 0000000..f5999af
--- /dev/null
+++ b/nifi-registry-provider-impl/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.nifi.registry</groupId>
+        <artifactId>nifi-registry</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>nifi-registry-provider-impl</artifactId>
+    <packaging>jar</packaging>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+            <resource>
+                <directory>src/main/xsd</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>jaxb2-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>current</id>
+                        <goals>
+                            <goal>xjc</goal>
+                        </goals>
+                        <configuration>
+                            <packageName>org.apache.nifi.registry.metadata.generated</packageName>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <configuration>
+                    <excludes>**/metadata/generated/*.java,</excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-provider-api</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-utils</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.7.22</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java
new file mode 100644
index 0000000..fc14c14
--- /dev/null
+++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/flow/FileSystemFlowPersistenceProvider.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.flow;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.apache.nifi.registry.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * A FlowPersistenceProvider that uses the local filesystem for storage.
+ */
+public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvider {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(FileSystemFlowPersistenceProvider.class);
+
+    static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
+
+    static final String SNAPSHOT_EXTENSION = ".snapshot";
+
+    private File flowStorageDir;
+
+    @Override
+    public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        final Map<String,String> props = configurationContext.getProperties();
+        if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) {
+            throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided");
+        }
+
+        final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP);
+        if (StringUtils.isBlank(flowStorageDirValue)) {
+            throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank");
+        }
+
+        try {
+            flowStorageDir = new File(flowStorageDirValue);
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir);
+            LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()});
+        } catch (IOException e) {
+            throw new ProviderCreationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void saveSnapshot(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
+        final File bucketDir = new File(flowStorageDir, context.getBucketId());
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e);
+        }
+
+        final File flowDir = new File(bucketDir, context.getFlowId());
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing flow directory at " + flowDir.getAbsolutePath(), e);
+        }
+
+        final String versionString = String.valueOf(context.getVersion());
+        final File versionDir = new File(flowDir, versionString);
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e);
+        }
+
+        final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION);
+        if (versionFile.exists()) {
+            throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()});
+        }
+
+        try (final OutputStream out = new FileOutputStream(versionFile)) {
+            out.write(content);
+            out.flush();
+        } catch (Exception e) {
+            throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public synchronized byte[] getSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+        final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()});
+        }
+
+        if (!snapshotFile.exists()) {
+            return null;
+        }
+
+        try (final InputStream in = new FileInputStream(snapshotFile)){
+            return IOUtils.toByteArray(in);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error reading snapshot file: " + snapshotFile.getAbsolutePath(), e);
+        }
+    }
+
+    @Override
+    public synchronized void deleteSnapshots(final String bucketId, final String flowId) throws FlowPersistenceException {
+        final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId);
+        if (!flowDir.exists()) {
+            LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()});
+            return;
+        }
+
+        try {
+            org.apache.commons.io.FileUtils.cleanDirectory(flowDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error deleting snapshots at " + flowDir.getAbsolutePath(), e);
+        }
+    }
+
+    @Override
+    public synchronized void deleteSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+        final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+        if (!snapshotFile.exists()) {
+            LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()});
+            return;
+        }
+
+        final boolean deleted = snapshotFile.delete();
+        if (!deleted) {
+            throw new FlowPersistenceException("Unable to delete snapshot at " + snapshotFile.getAbsolutePath());
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()});
+        }
+    }
+
+    protected File getSnapshotFile(final String bucketId, final String flowId, final int version) {
+        final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION;
+        return new File(flowStorageDir, snapshotFilename);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/9eb0cef0/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
new file mode 100644
index 0000000..ccdc1a8
--- /dev/null
+++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.metadata;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.metadata.generated.Buckets;
+import org.apache.nifi.registry.metadata.generated.Flow;
+import org.apache.nifi.registry.metadata.generated.Flows;
+import org.apache.nifi.registry.metadata.generated.Metadata;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * A MetadataProvider that persists metadata to the local filesystem.
+ */
+public class FileSystemMetadataProvider implements MetadataProvider {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemMetadataProvider.class);
+
+    private static final String METADATA_XSD = "/metadata.xsd";
+    private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.metadata.generated";
+    private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+    /**
+     * Load the JAXBContext.
+     */
+    private static JAXBContext initializeJaxbContext() {
+        try {
+            return JAXBContext.newInstance(JAXB_GENERATED_PATH, FileSystemMetadataProvider.class.getClassLoader());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.");
+        }
+    }
+
+    static final String METADATA_FILE_PROP = "Metadata File";
+
+    private File metadataFile;
+    private Schema metadataSchema;
+    private final AtomicReference<MetadataHolder> metadataHolder = new AtomicReference<>(null);
+
+    public FileSystemMetadataProvider() throws ProviderCreationException {
+        try {
+            final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+            metadataSchema = schemaFactory.newSchema(FileSystemMetadataProvider.class.getResource(METADATA_XSD));
+        } catch (SAXException e) {
+            throw new ProviderCreationException("Unable to create MetadataProvider due to: " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        final Map<String,String> config = configurationContext.getProperties();
+        if (!config.containsKey(METADATA_FILE_PROP)) {
+            throw new ProviderCreationException("The property " + METADATA_FILE_PROP + " must be provided");
+        }
+
+        final String metadataFileValue = config.get(METADATA_FILE_PROP);
+        if (StringUtils.isBlank(metadataFileValue)) {
+            throw new ProviderCreationException("The property " + METADATA_FILE_PROP + " cannot be null or blank");
+        }
+
+        try {
+            metadataFile = new File(metadataFileValue);
+            if (metadataFile.exists()) {
+                LOGGER.info("Loading metadata file from {}", new Object[] {metadataFile.getAbsolutePath()});
+                final Metadata metadata = unmarshallMetadata();
+                metadataHolder.set(new MetadataHolder(metadata));
+            } else {
+                LOGGER.info("Creating new metadata file at {}", new Object[] {metadataFile.getAbsolutePath()});
+
+                final Metadata metadata = new Metadata();
+                metadata.setBuckets(new Buckets());
+                metadata.setFlows(new Flows());
+
+                saveMetadata(metadata);
+                metadataHolder.set(new MetadataHolder(metadata));
+            }
+        } catch (Exception e) {
+            throw new ProviderCreationException("Unable to configure MetadataProvider due to: " + e.getMessage(), e);
+        }
+    }
+
+    private Metadata unmarshallMetadata() throws JAXBException {
+        final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+        unmarshaller.setSchema(metadataSchema);
+
+        final JAXBElement<Metadata> element = unmarshaller.unmarshal(new StreamSource(metadataFile), Metadata.class);
+        return element.getValue();
+    }
+
+    private void saveMetadata(final Metadata metadata) throws JAXBException {
+        final Marshaller marshaller = JAXB_CONTEXT.createMarshaller();
+        marshaller.setSchema(metadataSchema);
+        marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+        marshaller.marshal(metadata, metadataFile);
+    }
+
+    private synchronized void saveAndRefresh(final Metadata metadata) {
+        try {
+            saveMetadata(metadata);
+            metadataHolder.set(new MetadataHolder(metadata));
+        } catch (JAXBException e) {
+            throw new MetadataProviderException("Unable to save metadata", e);
+        }
+    }
+
+    @Override
+    public synchronized Bucket createBucket(final Bucket bucket) {
+        if (bucket == null) {
+            throw new IllegalArgumentException("Bucket cannot be null");
+        }
+
+        final org.apache.nifi.registry.metadata.generated.Bucket jaxbBucket = new org.apache.nifi.registry.metadata.generated.Bucket();
+        jaxbBucket.setIdentifier(bucket.getIdentifier());
+        jaxbBucket.setName(bucket.getName());
+        jaxbBucket.setDescription(bucket.getDescription());
+        jaxbBucket.setCreatedTimestamp(bucket.getCreatedTimestamp());
+
+        final MetadataHolder holder = metadataHolder.get();
+
+        final Metadata metadata = holder.getMetadata();
+        metadata.getBuckets().getBucket().add(jaxbBucket);
+
+        saveAndRefresh(metadata);
+        return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier());
+    }
+
+    @Override
+    public Bucket getBucket(final String bucketIdentifier) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        return holder.getBucketsBydId().get(bucketIdentifier);
+    }
+
+    @Override
+    public Set<Bucket> getBuckets() {
+        final MetadataHolder holder = metadataHolder.get();
+        final Map<String,Bucket> bucketsBydId = holder.getBucketsBydId();
+        return new HashSet<>(bucketsBydId.values());
+    }
+
+    @Override
+    public synchronized Bucket updateBucket(final Bucket bucket) {
+        if (bucket == null) {
+            throw new IllegalArgumentException("Bucket cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        final Buckets buckets = holder.getMetadata().getBuckets();
+
+        final org.apache.nifi.registry.metadata.generated.Bucket jaxbBucket = buckets.getBucket().stream()
+                .filter(b -> bucket.getIdentifier().equals(b.getIdentifier()))
+                .findFirst()
+                .orElse(null);
+
+        if (jaxbBucket == null) {
+            return null;
+        }
+
+        jaxbBucket.setName(bucket.getName());
+        jaxbBucket.setDescription(bucket.getDescription());
+
+        saveAndRefresh(holder.getMetadata());
+        return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier());
+    }
+
+    @Override
+    public synchronized void deleteBucket(final String bucketIdentifier) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        final Flows flows = holder.getMetadata().getFlows();
+        final Buckets buckets = holder.getMetadata().getBuckets();
+
+        // first remove any flow that reference the bucket
+        boolean deletedFlow = false;
+        final Iterator<Flow> flowIterator = flows.getFlow().iterator();
+        while (flowIterator.hasNext()) {
+            final Flow flow = flowIterator.next();
+            if (flow.getBucketIdentifier().equals(bucketIdentifier)) {
+                flowIterator.remove();
+                deletedFlow = true;
+            }
+        }
+
+        // now delete the actual bucket
+        boolean deleteBucket = false;
+        final Iterator<org.apache.nifi.registry.metadata.generated.Bucket> bucketIterator = buckets.getBucket().iterator();
+        while (bucketIterator.hasNext()) {
+            final org.apache.nifi.registry.metadata.generated.Bucket bucket = bucketIterator.next();
+            if (bucket.getIdentifier().equals(bucketIdentifier)) {
+               bucketIterator.remove();
+               deleteBucket = true;
+               break;
+            }
+        }
+
+        if (deletedFlow || deleteBucket) {
+            saveAndRefresh(holder.getMetadata());
+        }
+    }
+
+    @Override
+    public synchronized VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be blank");
+        }
+
+        if (versionedFlow == null) {
+            throw new IllegalArgumentException("Versioned Flow cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+
+        final Bucket bucket = holder.getBucketsBydId().get(bucketIdentifier);
+        if (bucket == null) {
+            throw new IllegalStateException("Unable to create Versioned Flow because Bucket does not exist with id " + bucketIdentifier);
+        }
+
+        final Flow jaxbFlow = new Flow();
+        jaxbFlow.setIdentifier(versionedFlow.getIdentifier());
+        jaxbFlow.setName(versionedFlow.getName());
+        jaxbFlow.setDescription(versionedFlow.getDescription());
+        jaxbFlow.setCreatedTimestamp(versionedFlow.getCreatedTimestamp());
+        jaxbFlow.setModifiedTimestamp(versionedFlow.getModifiedTimestamp());
+        jaxbFlow.setBucketIdentifier(bucketIdentifier);
+
+        final Metadata metadata = holder.getMetadata();
+        metadata.getFlows().getFlow().add(jaxbFlow);
+
+        saveAndRefresh(metadata);
+        return metadataHolder.get().getFlowsById().get(versionedFlow.getIdentifier());
+    }
+
+    @Override
+    public VersionedFlow getFlow(final String flowIdentifier) {
+        if (flowIdentifier == null) {
+            throw new IllegalArgumentException("Flow Identifier cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        return holder.getFlowsById().get(flowIdentifier);
+    }
+
+    @Override
+    public Set<VersionedFlow> getFlows() {
+        final MetadataHolder holder = metadataHolder.get();
+        final Map<String,VersionedFlow> flowsById = holder.getFlowsById();
+        return new HashSet<>(flowsById.values());
+    }
+
+    @Override
+    public Set<VersionedFlow> getFlows(String bucketId) {
+        final MetadataHolder holder = metadataHolder.get();
+
+        final Map<String,Set<VersionedFlow>> flowsByBucket = holder.getFlowsByBucket();
+        if (flowsByBucket.containsKey(bucketId)) {
+            return new HashSet<>(flowsByBucket.get(bucketId));
+        } else {
+            return Collections.emptySet();
+        }
+    }
+
+    @Override
+    public synchronized VersionedFlow updateFlow(final VersionedFlow versionedFlow) {
+        if (versionedFlow == null) {
+            throw new IllegalArgumentException("Versioned Flow cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        final Flows flows = holder.getMetadata().getFlows();
+
+        final Flow jaxbFlow = flows.getFlow().stream()
+                .filter(f -> versionedFlow.getIdentifier().equals(f.getIdentifier()))
+                .findFirst()
+                .orElse(null);
+
+        if (jaxbFlow == null) {
+            return null;
+        }
+
+        // TODO should we allow changing the bucket id here, if so it needs to be passed in
+        jaxbFlow.setName(versionedFlow.getName());
+        jaxbFlow.setDescription(versionedFlow.getDescription());
+        jaxbFlow.setModifiedTimestamp(System.currentTimeMillis());
+
+        saveAndRefresh(holder.getMetadata());
+        return metadataHolder.get().getFlowsById().get(versionedFlow.getIdentifier());
+    }
+
+    @Override
+    public synchronized void deleteFlow(final String flowIdentifier) {
+        if (flowIdentifier == null) {
+            throw new IllegalArgumentException("Flow Identifier cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        final Flows flows = holder.getMetadata().getFlows();
+
+        boolean deleted = false;
+        final Iterator<Flow> flowIter = flows.getFlow().iterator();
+
+        while (flowIter.hasNext()) {
+            final Flow jaxbFlow = flowIter.next();
+            if (jaxbFlow.getIdentifier().equals(flowIdentifier)) {
+                flowIter.remove();
+                deleted = true;
+                break;
+            }
+        }
+
+        if (deleted) {
+            saveAndRefresh(holder.getMetadata());
+        }
+    }
+
+    @Override
+    public synchronized VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
+        if (flowSnapshot == null) {
+            throw new IllegalArgumentException("Versioned Flow Snapshot cannot be null");
+        }
+
+        final String flowIdentifier = flowSnapshot.getFlowIdentifier();
+        final int snapshotVersion = flowSnapshot.getVersion();
+
+        final MetadataHolder holder = metadataHolder.get();
+        final Flows flows = holder.getMetadata().getFlows();
+
+        final Flow jaxbFlow = flows.getFlow().stream()
+                .filter(f -> flowIdentifier.equals(f.getIdentifier()))
+                .findFirst()
+                .orElse(null);
+
+        if (jaxbFlow == null) {
+            throw new IllegalStateException("Unable to create snapshot because Versioned Flow does not exist for id " + flowIdentifier);
+        }
+
+        final Flow.Snapshot jaxbSnapshot = new Flow.Snapshot();
+        jaxbSnapshot.setVersion(flowSnapshot.getVersion());
+        jaxbSnapshot.setComments(flowSnapshot.getComments());
+        jaxbSnapshot.setCreatedTimestamp(flowSnapshot.getTimestamp());
+
+        jaxbFlow.getSnapshot().add(jaxbSnapshot);
+        saveAndRefresh(holder.getMetadata());
+
+        final VersionedFlow versionedFlow = metadataHolder.get().getFlowsById().get(flowIdentifier);
+        return versionedFlow.getSnapshot(snapshotVersion);
+    }
+
+    @Override
+    public VersionedFlowSnapshot getFlowSnapshot(final String flowIdentifier, final Integer version) {
+        if (flowIdentifier == null) {
+            throw new IllegalArgumentException("Flow Identifier cannot be null");
+        }
+
+        if (version == null) {
+            throw new IllegalArgumentException("Version cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+
+        final VersionedFlow versionedFlow = holder.getFlowsById().get(flowIdentifier);
+        if (versionedFlow == null) {
+            return null;
+        }
+
+        return versionedFlow.getSnapshot(version);
+    }
+
+    @Override
+    public synchronized void deleteFlowSnapshot(final String flowIdentifier, final Integer version) {
+        if (flowIdentifier == null) {
+            throw new IllegalArgumentException("Flow Identifier cannot be null");
+        }
+
+        if (version == null) {
+            throw new IllegalArgumentException("Version cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        final Flows flows = holder.getMetadata().getFlows();
+
+        final Flow jaxbFlow = flows.getFlow().stream()
+                .filter(f -> flowIdentifier.equals(f.getIdentifier()))
+                .findFirst()
+                .orElse(null);
+
+        if (jaxbFlow == null) {
+            return;
+        }
+
+        boolean deletedSnapshot = false;
+        final Iterator<Flow.Snapshot> snapshotIterator = jaxbFlow.getSnapshot().iterator();
+
+        while (snapshotIterator.hasNext()) {
+            final Flow.Snapshot snapshot = snapshotIterator.next();
+            if (snapshot.getVersion().equals(version)) {
+                snapshotIterator.remove();
+                deletedSnapshot = true;
+                break;
+            }
+        }
+
+        if (deletedSnapshot) {
+            saveAndRefresh(holder.getMetadata());
+        }
+    }
+
+}