You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2019/08/05 18:34:48 UTC

[nifi-registry] branch master updated: NIFIREG-285 - Add DatabaseFlowPersistenceProvider

This is an automated email from the ASF dual-hosted git repository.

kdoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-registry.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f19a73  NIFIREG-285 - Add DatabaseFlowPersistenceProvider
6f19a73 is described below

commit 6f19a736c00242affb82a0d326e686be5a2e1b50
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Mon Jul 15 11:49:21 2019 -0400

    NIFIREG-285 - Add DatabaseFlowPersistenceProvider
    
    - Added method injection of DataSource in StandardProviderFactory
    - Split out postgres migrations to use BYTEA for blob type
    
    This closes #205.
    
    Signed-off-by: Kevin Doran <kd...@apache.org>
---
 .../src/main/asciidoc/administration-guide.adoc    |  23 ++-
 .../registry/db/CustomFlywayConfiguration.java     |   7 +
 .../registry/provider/StandardProviderFactory.java |  52 +++++-
 .../flow/DatabaseFlowPersistenceProvider.java      |  86 ++++++++++
 ...ache.nifi.registry.flow.FlowPersistenceProvider |   3 +-
 .../migration/default/V6__AddFlowPersistence.sql   |  22 +++
 .../db/migration/mysql/V6__AddFlowPersistence.sql  |  22 +++
 .../db/migration/postgres/V2__Initial.sql          |  60 +++++++
 .../db/migration/postgres/V3__AddExtensions.sql    | 105 ++++++++++++
 .../migration/postgres/V4__AddCascadeOnDelete.sql  |  23 +++
 .../postgres/V5__AddBucketPublicFlags.sql          |  16 ++
 .../migration/postgres/V6__AddFlowPersistence.sql  |  22 +++
 .../provider/TestStandardProviderFactory.java      |  18 ++-
 .../flow/TestDatabaseFlowPersistenceProvider.java  | 100 ++++++++++++
 .../provider/hook/TestScriptEventHookProvider.java |  12 +-
 .../nifi/registry/provider/ProviderContext.java    |  34 ++++
 .../src/main/resources/conf/providers.xml          |   6 +
 .../nifi/registry/web/api/DBFlowStorageIT.java     | 178 +++++++++++++++++++++
 .../application-ITDBFlowStorage.properties}        |   9 +-
 .../conf/db-flow-storage/nifi-registry.properties} |  16 +-
 .../resources/conf/providers-db-flow-storage.xml   |  29 ++++
 .../FlowPersistenceProviderMigrator.java           |  13 +-
 22 files changed, 832 insertions(+), 24 deletions(-)

diff --git a/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc b/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc
index 29b5589..a28dc50 100644
--- a/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc
@@ -1118,7 +1118,7 @@ Currently, NiFi Registry supports using H2, Postgres 9.x, and MySQL (5.6, 5.7, 8
 
 NOTE: NiFi Registry 0.1.0 only supports H2.
 
-== H2
+=== H2
 
 H2 is an embedded database that is pre-configured in the default _nifi-registry.properties_ file. The contents of the H2 database are stored in a file on the local filesystem.
 
@@ -1130,7 +1130,7 @@ For NiFi Registry 0.2.0 and forward, the location of the H2 database is specifie
 
 `nifi.registry.db.url=jdbc:h2:./database/nifi-registry-primary;`
 
-== Postgres
+=== Postgres
 
 Postgres provides the option to use an externally located database that also supports high availability.
 
@@ -1159,7 +1159,7 @@ The following steps are required to use Postgres:
   nifi.registry.db.username=nifireg
   nifi.registry.db.password=changeme
 
-== MySQL
+=== MySQL
 
 MySQL also provides the option to use an externally located database that also supports high availability.
 
@@ -1357,6 +1357,23 @@ Host bitbucket.org
   IdentityFile ~/.ssh/key-for-bitbucket
 ....
 
+==== DatabaseFlowPersistenceProvider
+
+`DatabaseFlowPersistenceProvider` stores flow contents in a database table.
+
+This provider leverages the same database used for the metadata database, so there is no configuration to provide since the
+connection details will come from the database properties in `nifi-registry.properties`.
+
+The database table is named `FLOW_PERSISTENCE_PROVIDER` and has the following schema:
+
+|====
+|*Column*|*Description*
+|BUCKET_ID|The identifier of the bucket where the flow is located.
+|FLOW_ID|The identifier of the flow.
+|VERSION|The version of the flow.
+|FLOW_CONTENT|The serialized bytes of the flow content stored as a BLOB.
+|====
+
 ==== Switching from other Flow Persistence Provider
 
 In order to switch the Flow Persistence Provider, it is necessary to reset NiFi Registry.
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java
index 0cbf64f..0288f9d 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java
@@ -41,6 +41,9 @@ public class CustomFlywayConfiguration implements FlywayConfigurationCustomizer
     private static final String LOCATION_MYSQL = "classpath:db/migration/mysql";
     private static final String[] LOCATIONS_MYSQL = {LOCATION_COMMON, LOCATION_MYSQL};
 
+    private static final String LOCATION_POSTGRES = "classpath:db/migration/postgres";
+    private static final String[] LOCATIONS_POSTGRES = {LOCATION_COMMON, LOCATION_POSTGRES};
+
     @Override
     public void customize(final FluentConfiguration configuration) {
         final DatabaseType databaseType = getDatabaseType(configuration.getDataSource());
@@ -51,6 +54,10 @@ public class CustomFlywayConfiguration implements FlywayConfigurationCustomizer
                 LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_MYSQL});
                 configuration.locations(LOCATIONS_MYSQL);
                 break;
+            case POSTGRESQL:
+                LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_POSTGRES});
+                configuration.locations(LOCATIONS_POSTGRES);
+                break;
             default:
                 LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_DEFAULT});
                 configuration.locations(LOCATIONS_DEFAULT);
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
index acd7705..c9eb9f5 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -32,6 +32,7 @@ import org.springframework.context.annotation.Configuration;
 import org.xml.sax.SAXException;
 
 import javax.annotation.PostConstruct;
+import javax.sql.DataSource;
 import javax.xml.XMLConstants;
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBElement;
@@ -42,6 +43,8 @@ import javax.xml.validation.Schema;
 import javax.xml.validation.SchemaFactory;
 import java.io.File;
 import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -73,6 +76,7 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
 
     private final NiFiRegistryProperties properties;
     private final ExtensionManager extensionManager;
+    private final DataSource dataSource;
     private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null);
 
     private FlowPersistenceProvider flowPersistenceProvider;
@@ -80,9 +84,10 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
     private BundlePersistenceProvider bundlePersistenceProvider;
 
     @Autowired
-    public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) {
+    public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager, final DataSource dataSource) {
         this.properties = properties;
         this.extensionManager = extensionManager;
+        this.dataSource = dataSource;
 
         if (this.properties == null) {
             throw new IllegalStateException("NiFiRegistryProperties cannot be null");
@@ -91,6 +96,10 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
         if (this.extensionManager == null) {
             throw new IllegalStateException("ExtensionManager cannot be null");
         }
+
+        if (this.dataSource == null) {
+            throw new IllegalStateException("DataSource cannot be null");
+        }
     }
 
     @PostConstruct
@@ -144,14 +153,16 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
                 final Constructor constructor = flowProviderClass.getConstructor();
                 flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance();
 
-                LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+                performMethodInjection(flowPersistenceProvider, flowProviderClass);
+
+                LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[]{flowProviderClassName});
             } catch (Exception e) {
                 throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e);
             }
 
             final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty());
             flowPersistenceProvider.onConfigured(configurationContext);
-            LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+            LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[]{flowProviderClassName});
         }
 
         return flowPersistenceProvider;
@@ -192,6 +203,8 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
                     final Constructor constructor = hookProviderClass.getConstructor();
                     hook = (EventHookProvider) constructor.newInstance();
 
+                    performMethodInjection(hook, hookProviderClass);
+
                     LOGGER.info("Instantiated EventHookProvider with class name {}", new Object[] {hookProviderClassName});
                 } catch (Exception e) {
                     throw new ProviderFactoryException("Error creating EventHookProvider with class name: " + hookProviderClassName, e);
@@ -233,6 +246,8 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
                 final Constructor constructor = extensionBundleProviderClass.getConstructor();
                 bundlePersistenceProvider = (BundlePersistenceProvider) constructor.newInstance();
 
+                performMethodInjection(bundlePersistenceProvider, extensionBundleProviderClass);
+
                 LOGGER.info("Instantiated BundlePersistenceProvider with class name {}", new Object[] {extensionBundleProviderClassName});
             } catch (Exception e) {
                 throw new ProviderFactoryException("Error creating BundlePersistenceProvider with class name: " + extensionBundleProviderClassName, e);
@@ -273,4 +288,35 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
         return new StandardProviderConfigurationContext(properties);
     }
 
+    private void performMethodInjection(final Object instance, final Class providerClass) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+        for (final Method method : providerClass.getMethods()) {
+            if (method.isAnnotationPresent(ProviderContext.class)) {
+                // make the method accessible
+                final boolean isAccessible = method.isAccessible();
+                method.setAccessible(true);
+
+                try {
+                    final Class<?>[] argumentTypes = method.getParameterTypes();
+
+                    // look for setters (single argument)
+                    if (argumentTypes.length == 1) {
+                        final Class<?> argumentType = argumentTypes[0];
+
+                        // look for well known types, currently we only support injecting the DataSource
+                        if (DataSource.class.isAssignableFrom(argumentType)) {
+                            method.invoke(instance, dataSource);
+                        }
+                    }
+                } finally {
+                    method.setAccessible(isAccessible);
+                }
+            }
+        }
+
+        final Class parentClass = providerClass.getSuperclass();
+        if (parentClass != null && Provider.class.isAssignableFrom(parentClass)) {
+            performMethodInjection(instance, parentClass);
+        }
+    }
+
 }
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java
new file mode 100644
index 0000000..ee38592
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import javax.sql.DataSource;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A FlowPersistenceProvider that uses a database table for storage. The intent is to use the same database as the rest
+ * of the application so that all data can be stored together and benefit from any replication/scaling of the database.
+ */
+public class DatabaseFlowPersistenceProvider implements FlowPersistenceProvider {
+
+    private DataSource dataSource;
+    private JdbcTemplate jdbcTemplate;
+
+    @ProviderContext
+    public void setDataSource(final DataSource dataSource) {
+        this.dataSource = dataSource;
+        this.jdbcTemplate = new JdbcTemplate(this.dataSource);
+    }
+
+    @Override
+    public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        // there is no config since we get the DataSource from the framework
+    }
+
+    @Override
+    public void saveFlowContent(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
+        final String sql = "INSERT INTO FLOW_PERSISTENCE_PROVIDER (BUCKET_ID, FLOW_ID, VERSION, FLOW_CONTENT) VALUES (?, ?, ?, ?)";
+        jdbcTemplate.update(sql, context.getBucketId(), context.getFlowId(), context.getVersion(), content);
+    }
+
+    @Override
+    public byte[] getFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+        final List<byte[]> results = new ArrayList<>();
+        final String sql = "SELECT FLOW_CONTENT FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ? and VERSION = ?";
+
+        jdbcTemplate.query(sql, new Object[] {bucketId, flowId, version}, (rs) -> {
+            final byte[] content = rs.getBytes("FLOW_CONTENT");
+            results.add(content);
+        });
+
+        if (results.isEmpty()) {
+            return null;
+        } else {
+            return results.get(0);
+        }
+    }
+
+    @Override
+    public void deleteAllFlowContent(final String bucketId, final String flowId) throws FlowPersistenceException {
+        final String sql = "DELETE FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ?";
+        jdbcTemplate.update(sql, bucketId, flowId);
+    }
+
+    @Override
+    public void deleteFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+        final String sql = "DELETE FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ? and VERSION = ?";
+        jdbcTemplate.update(sql, bucketId, flowId, version);
+    }
+
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
index e456fa2..df57a73 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
-org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
\ No newline at end of file
+org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
+org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql
new file mode 100644
index 0000000..0765b70
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql
@@ -0,0 +1,22 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE FLOW_PERSISTENCE_PROVIDER (
+    BUCKET_ID VARCHAR(50) NOT NULL,
+    FLOW_ID VARCHAR(50) NOT NULL,
+    VERSION INT NOT NULL,
+    FLOW_CONTENT BLOB NOT NULL,
+    CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION)
+);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql
new file mode 100644
index 0000000..ad3d53f
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql
@@ -0,0 +1,22 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE FLOW_PERSISTENCE_PROVIDER (
+    BUCKET_ID VARCHAR(50) NOT NULL,
+    FLOW_ID VARCHAR(50) NOT NULL,
+    VERSION INT NOT NULL,
+    FLOW_CONTENT LONGBLOB NOT NULL,
+    CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION)
+);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql
new file mode 100644
index 0000000..b992d23
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql
@@ -0,0 +1,60 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- The NAME column has a max size of 768 because this is the largest size that MySQL allows when using a unique constraint.
+CREATE TABLE BUCKET (
+    ID VARCHAR(50) NOT NULL,
+    NAME VARCHAR(1000) NOT NULL,
+    DESCRIPTION TEXT,
+    CREATED TIMESTAMP NOT NULL,
+    CONSTRAINT PK__BUCKET_ID PRIMARY KEY (ID),
+    CONSTRAINT UNIQUE__BUCKET_NAME UNIQUE (NAME)
+);
+
+CREATE TABLE BUCKET_ITEM (
+    ID VARCHAR(50) NOT NULL,
+    NAME VARCHAR(1000) NOT NULL,
+    DESCRIPTION TEXT,
+    CREATED TIMESTAMP NOT NULL,
+    MODIFIED TIMESTAMP NOT NULL,
+    ITEM_TYPE VARCHAR(50) NOT NULL,
+    BUCKET_ID VARCHAR(50) NOT NULL,
+    CONSTRAINT PK__BUCKET_ITEM_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID)
+);
+
+CREATE TABLE FLOW (
+    ID VARCHAR(50) NOT NULL,
+    CONSTRAINT PK__FLOW_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__FLOW_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID)
+);
+
+CREATE TABLE FLOW_SNAPSHOT (
+    FLOW_ID VARCHAR(50) NOT NULL,
+    VERSION INT NOT NULL,
+    CREATED TIMESTAMP NOT NULL,
+    CREATED_BY VARCHAR(4096) NOT NULL,
+    COMMENTS TEXT,
+    CONSTRAINT PK__FLOW_SNAPSHOT_FLOW_ID_AND_VERSION PRIMARY KEY (FLOW_ID, VERSION),
+    CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID)
+);
+
+CREATE TABLE SIGNING_KEY (
+    ID VARCHAR(50) NOT NULL,
+    TENANT_IDENTITY VARCHAR(4096) NOT NULL,
+    KEY_VALUE VARCHAR(50) NOT NULL,
+    CONSTRAINT PK__SIGNING_KEY_ID PRIMARY KEY (ID),
+    CONSTRAINT UNIQUE__SIGNING_KEY_TENANT_IDENTITY UNIQUE (TENANT_IDENTITY)
+);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql
new file mode 100644
index 0000000..3bd9820
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql
@@ -0,0 +1,105 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE BUNDLE (
+    ID VARCHAR(50) NOT NULL,
+    BUCKET_ID VARCHAR(50) NOT NULL,
+    BUNDLE_TYPE VARCHAR(200) NOT NULL,
+    GROUP_ID VARCHAR(500) NOT NULL,
+    ARTIFACT_ID VARCHAR(500) NOT NULL,
+    CONSTRAINT PK__EXTENSION_BUNDLE_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__EXTENSION_BUNDLE_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID) ON DELETE CASCADE,
+    CONSTRAINT FK__EXTENSION_BUNDLE_BUCKET_ID FOREIGN KEY(BUCKET_ID) REFERENCES BUCKET(ID) ON DELETE CASCADE,
+    CONSTRAINT UNIQUE__EXTENSION_BUNDLE_BUCKET_GROUP_ARTIFACT UNIQUE (BUCKET_ID, GROUP_ID, ARTIFACT_ID)
+);
+
+CREATE TABLE BUNDLE_VERSION (
+    ID VARCHAR(50) NOT NULL,
+    BUNDLE_ID VARCHAR(50) NOT NULL,
+    VERSION VARCHAR(100) NOT NULL,
+    CREATED TIMESTAMP NOT NULL,
+    CREATED_BY VARCHAR(4096) NOT NULL,
+    DESCRIPTION TEXT,
+    SHA_256_HEX VARCHAR(512) NOT NULL,
+    SHA_256_SUPPLIED INT NOT NULL,
+    CONTENT_SIZE BIGINT NOT NULL,
+    SYSTEM_API_VERSION VARCHAR(50),
+    BUILD_TOOL VARCHAR(100),
+    BUILD_FLAGS VARCHAR(100),
+    BUILD_BRANCH VARCHAR(200),
+    BUILD_TAG VARCHAR(200),
+    BUILD_REVISION VARCHAR(100),
+    BUILT TIMESTAMP,
+    BUILT_BY VARCHAR(4096),
+    CONSTRAINT PK__BUNDLE_VERSION_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__BUNDLE_VERSION_BUNDLE_ID FOREIGN KEY (BUNDLE_ID) REFERENCES BUNDLE(ID) ON DELETE CASCADE,
+    CONSTRAINT UNIQUE__BUNDLE_VERSION_BUNDLE_ID_VERSION UNIQUE (BUNDLE_ID, VERSION)
+);
+
+CREATE TABLE BUNDLE_VERSION_DEPENDENCY (
+    ID VARCHAR(50) NOT NULL,
+    BUNDLE_VERSION_ID VARCHAR(50) NOT NULL,
+    GROUP_ID VARCHAR(500) NOT NULL,
+    ARTIFACT_ID VARCHAR(500) NOT NULL,
+    VERSION VARCHAR(100) NOT NULL,
+    CONSTRAINT PK__BUNDLE_VERSION_DEPENDENCY_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__BUNDLE_VERSION_DEPENDENCY_BUNDLE_VERSION_ID FOREIGN KEY (BUNDLE_VERSION_ID) REFERENCES BUNDLE_VERSION(ID) ON DELETE CASCADE,
+    CONSTRAINT UNIQUE__BUNDLE_VERSION_DEPENDENCY_BUNDLE_ID_GROUP_ARTIFACT_VERSION UNIQUE (BUNDLE_VERSION_ID, GROUP_ID, ARTIFACT_ID, VERSION)
+);
+
+CREATE TABLE EXTENSION (
+    ID VARCHAR(50) NOT NULL,
+    BUNDLE_VERSION_ID VARCHAR(50) NOT NULL,
+    NAME VARCHAR(500) NOT NULL,
+    DISPLAY_NAME VARCHAR(500) NOT NULL,
+    TYPE VARCHAR(100) NOT NULL,
+    CONTENT TEXT NOT NULL,
+    ADDITIONAL_DETAILS TEXT,
+    HAS_ADDITIONAL_DETAILS INT NOT NULL,
+    CONSTRAINT PK__EXTENSION_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__EXTENSION_BUNDLE_VERSION_ID FOREIGN KEY (BUNDLE_VERSION_ID) REFERENCES BUNDLE_VERSION(ID) ON DELETE CASCADE,
+    CONSTRAINT UNIQUE__EXTENSION_BUNDLE_VERSION_ID_AND_NAME UNIQUE (BUNDLE_VERSION_ID, NAME)
+);
+
+CREATE TABLE EXTENSION_PROVIDED_SERVICE_API (
+    ID VARCHAR(50) NOT NULL,
+    EXTENSION_ID VARCHAR(50) NOT NULL,
+    CLASS_NAME VARCHAR (500) NOT NULL,
+    GROUP_ID VARCHAR(500) NOT NULL,
+    ARTIFACT_ID VARCHAR(500) NOT NULL,
+    VERSION VARCHAR(100) NOT NULL,
+    CONSTRAINT PK__EXTENSION_PROVIDED_SERVICE_API_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__EXTENSION_PROVIDED_SERVICE_API_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE,
+    CONSTRAINT UNIQUE__EXTENSION_PROVIDED_SERVICE_API UNIQUE (EXTENSION_ID, CLASS_NAME, GROUP_ID, ARTIFACT_ID, VERSION)
+);
+
+CREATE TABLE EXTENSION_RESTRICTION (
+    ID VARCHAR(50) NOT NULL,
+    EXTENSION_ID VARCHAR(50) NOT NULL,
+    REQUIRED_PERMISSION VARCHAR(200) NOT NULL,
+    EXPLANATION VARCHAR (4096) NOT NULL,
+    CONSTRAINT PK__EXTENSION_RESTRICTION_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__EXTENSION_RESTRICTION_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE,
+    CONSTRAINT UNIQUE__EXTENSION_RESTRICTION_EXTENSION_ID_REQUIRED_PERMISSION UNIQUE (EXTENSION_ID, REQUIRED_PERMISSION)
+);
+
+CREATE TABLE EXTENSION_TAG (
+    EXTENSION_ID VARCHAR(50) NOT NULL,
+    TAG VARCHAR(200) NOT NULL,
+    CONSTRAINT PK__EXTENSION_TAG_EXTENSION_ID_AND_TAG PRIMARY KEY (EXTENSION_ID, TAG),
+    CONSTRAINT FK__EXTENSION_TAG_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE
+);
+
+ALTER TABLE BUCKET ADD ALLOW_EXTENSION_BUNDLE_REDEPLOY INT NOT NULL DEFAULT (0);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql
new file mode 100644
index 0000000..5b0e6c6
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql
@@ -0,0 +1,23 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+ALTER TABLE BUCKET_ITEM DROP CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID;
+ALTER TABLE BUCKET_ITEM ADD CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID) ON DELETE CASCADE;
+
+ALTER TABLE FLOW DROP CONSTRAINT FK__FLOW_BUCKET_ITEM_ID;
+ALTER TABLE FLOW ADD CONSTRAINT FK__FLOW_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID) ON DELETE CASCADE;
+
+ALTER TABLE FLOW_SNAPSHOT DROP CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID;
+ALTER TABLE FLOW_SNAPSHOT ADD CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID) ON DELETE CASCADE;
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql
new file mode 100644
index 0000000..ef7478b
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql
@@ -0,0 +1,16 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+ALTER TABLE BUCKET ADD ALLOW_PUBLIC_READ INT NOT NULL DEFAULT (0);
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql
new file mode 100644
index 0000000..395760b
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql
@@ -0,0 +1,22 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE FLOW_PERSISTENCE_PROVIDER (
+    BUCKET_ID VARCHAR(50) NOT NULL,
+    FLOW_ID VARCHAR(50) NOT NULL,
+    VERSION INT NOT NULL,
+    FLOW_CONTENT BYTEA NOT NULL,
+    CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION)
+);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
index ce374f7..0d23110 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
@@ -23,6 +23,8 @@ import org.apache.nifi.registry.properties.NiFiRegistryProperties;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import javax.sql.DataSource;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.any;
@@ -38,7 +40,9 @@ public class TestStandardProviderFactory {
         final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
         when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
 
-        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+        final DataSource dataSource = Mockito.mock(DataSource.class);
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
         providerFactory.initialize();
 
         final FlowPersistenceProvider flowPersistenceProvider = providerFactory.getFlowPersistenceProvider();
@@ -66,7 +70,9 @@ public class TestStandardProviderFactory {
         final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
         when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
 
-        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+        final DataSource dataSource = Mockito.mock(DataSource.class);
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
         providerFactory.getFlowPersistenceProvider();
     }
 
@@ -78,7 +84,9 @@ public class TestStandardProviderFactory {
         final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
         when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
 
-        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+        final DataSource dataSource = Mockito.mock(DataSource.class);
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
         providerFactory.initialize();
     }
 
@@ -90,7 +98,9 @@ public class TestStandardProviderFactory {
         final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
         when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
 
-        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+        final DataSource dataSource = Mockito.mock(DataSource.class);
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
         providerFactory.initialize();
 
         providerFactory.getFlowPersistenceProvider();
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java
new file mode 100644
index 0000000..5314860
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.nifi.registry.db.DatabaseTestApplication;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.TestExecutionListeners;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
+import org.springframework.test.context.transaction.TransactionalTestExecutionListener;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.sql.DataSource;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
+
+@Transactional
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = DatabaseTestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
+@TestExecutionListeners({DependencyInjectionTestExecutionListener.class, TransactionalTestExecutionListener.class})
+public class TestDatabaseFlowPersistenceProvider {
+
+    @Autowired
+    private DataSource dataSource;
+
+    private FlowPersistenceProvider persistenceProvider;
+
+    @Before
+    public void setup() {
+        persistenceProvider = new DatabaseFlowPersistenceProvider();
+        ((DatabaseFlowPersistenceProvider)persistenceProvider).setDataSource(dataSource);
+    }
+
+    @Test
+    public void testAll() {
+        // Save two versions of a flow...
+        final FlowSnapshotContext context1 = getFlowSnapshotContext("b1", "f1", 1);
+        final byte[] content1 = "f1v1".getBytes(StandardCharsets.UTF_8);
+        persistenceProvider.saveFlowContent(context1, content1);
+
+        final FlowSnapshotContext context2 = getFlowSnapshotContext("b1", "f1", 2);
+        final byte[] content2 = "f1v2".getBytes(StandardCharsets.UTF_8);
+        persistenceProvider.saveFlowContent(context2, content2);
+
+        // Verify we can retrieve both versions and that the content is correct
+        final byte[] retrievedContent1 = persistenceProvider.getFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion());
+        assertNotNull(retrievedContent1);
+        assertEquals("f1v1", new String(retrievedContent1, StandardCharsets.UTF_8));
+
+        final byte[] retrievedContent2 = persistenceProvider.getFlowContent(context2.getBucketId(), context2.getFlowId(), context2.getVersion());
+        assertNotNull(retrievedContent2);
+        assertEquals("f1v2", new String(retrievedContent2, StandardCharsets.UTF_8));
+
+        // Delete a specific version and verify we can longer retrieve it
+        persistenceProvider.deleteFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion());
+
+        final byte[] deletedContent1 = persistenceProvider.getFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion());
+        assertNull(deletedContent1);
+
+        // Delete all content for a flow
+        persistenceProvider.deleteAllFlowContent(context1.getBucketId(), context1.getFlowId());
+
+        final byte[] deletedContent2 = persistenceProvider.getFlowContent(context2.getBucketId(), context2.getFlowId(), context2.getVersion());
+        assertNull(deletedContent2);
+    }
+
+    private FlowSnapshotContext getFlowSnapshotContext(final String bucketId, final String flowId, final int version) {
+        final FlowSnapshotContext context = Mockito.mock(FlowSnapshotContext.class);
+        when(context.getBucketId()).thenReturn(bucketId);
+        when(context.getFlowId()).thenReturn(flowId);
+        when(context.getVersion()).thenReturn(version);
+        return context;
+    }
+
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java
index ab24998..b7d241b 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java
@@ -16,9 +16,6 @@
  */
 package org.apache.nifi.registry.provider.hook;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.when;
-
 import org.apache.nifi.registry.extension.ExtensionManager;
 import org.apache.nifi.registry.properties.NiFiRegistryProperties;
 import org.apache.nifi.registry.provider.ProviderCreationException;
@@ -27,6 +24,11 @@ import org.apache.nifi.registry.provider.StandardProviderFactory;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import javax.sql.DataSource;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
 public class TestScriptEventHookProvider {
 
     @Test(expected = ProviderCreationException.class)
@@ -37,7 +39,9 @@ public class TestScriptEventHookProvider {
         final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
         when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
 
-        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+        final DataSource dataSource = Mockito.mock(DataSource.class);
+
+        final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
         providerFactory.initialize();
         providerFactory.getEventHookProviders();
     }
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java
new file mode 100644
index 0000000..9729176
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for setter methods in a provider to indicate the framework should inject the requested resource.
+ */
+@Documented
+@Target({ElementType.FIELD, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface ProviderContext {
+}
diff --git a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
index f41eee8..3eb138f 100644
--- a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
+++ b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
@@ -37,6 +37,12 @@
     -->
 
     <!--
+    <flowPersistenceProvider>
+        <class>org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider</class>
+    </flowPersistenceProvider>
+    -->
+
+    <!--
     <eventHookProvider>
     	<class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class>
     	<property name="Script Path"></property>
diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java
new file mode 100644
index 0000000..b438eaa
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.web.api;
+
+import org.apache.nifi.registry.NiFiRegistryTestApiApplication;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.jdbc.Sql;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(
+        classes = NiFiRegistryTestApiApplication.class,
+        webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+        properties = "spring.profiles.include=ITDBFlowStorage")
+@Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = "classpath:db/clearDB.sql")
+public class DBFlowStorageIT extends IntegrationTestBase {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(UnsecuredNiFiRegistryClientIT.class);
+
+    private NiFiRegistryClient client;
+
+    @Before
+    public void setup() throws IOException {
+        final String baseUrl = createBaseURL();
+        LOGGER.info("Using base url = " + baseUrl);
+
+        final NiFiRegistryClientConfig clientConfig = new NiFiRegistryClientConfig.Builder()
+                .baseUrl(baseUrl)
+                .build();
+        assertNotNull(clientConfig);
+
+        final NiFiRegistryClient client = new JerseyNiFiRegistryClient.Builder()
+                .config(clientConfig)
+                .build();
+        assertNotNull(client);
+        this.client = client;
+    }
+
+    @After
+    public void teardown() {
+        try {
+            client.close();
+        } catch (Exception e) {
+            LOGGER.warn(e.getMessage(), e);
+        }
+    }
+
+    @Test
+    public void testAll() throws IOException, NiFiRegistryException {
+        // Create two buckets...
+
+        final Bucket b1 = new Bucket();
+        b1.setName("b1");
+
+        final Bucket createdB1 = client.getBucketClient().create(b1);
+        assertNotNull(createdB1);
+
+        final Bucket b2 = new Bucket();
+        b2.setName("b2");
+
+        final Bucket createdB2 = client.getBucketClient().create(b2);
+        assertNotNull(createdB2);
+
+        // Create two flows...
+
+        final VersionedFlow f1 = new VersionedFlow();
+        f1.setName("f1");
+        f1.setBucketIdentifier(createdB1.getIdentifier());
+
+        final VersionedFlow createdF1 = client.getFlowClient().create(f1);
+        assertNotNull(createdF1);
+
+        final VersionedFlow f2 = new VersionedFlow();
+        f2.setName("f2");
+        f2.setBucketIdentifier(createdB2.getIdentifier());
+
+        final VersionedFlow createdF2 = client.getFlowClient().create(f2);
+        assertNotNull(createdF2);
+
+        // Create some versions for each flow...
+
+        final VersionedFlowSnapshot snapshotF1V1 = createSnapshot(createdB1, createdF1, 1, "f1v1");
+        final VersionedFlowSnapshot createdSnapshotF1V1 = client.getFlowSnapshotClient().create(snapshotF1V1);
+        assertNotNull(createdSnapshotF1V1);
+
+        final VersionedFlowSnapshot snapshotF1V2 = createSnapshot(createdB1, createdF1, 2, "f1v2");
+        final VersionedFlowSnapshot createdSnapshotF1V2 = client.getFlowSnapshotClient().create(snapshotF1V2);
+        assertNotNull(createdSnapshotF1V2);
+
+        final VersionedFlowSnapshot snapshotF2V1 = createSnapshot(createdB2, createdF2, 1, "f2v1");
+        final VersionedFlowSnapshot createdSnapshotF2V1 = client.getFlowSnapshotClient().create(snapshotF2V1);
+        assertNotNull(createdSnapshotF2V1);
+
+        final VersionedFlowSnapshot snapshotF2V2 = createSnapshot(createdB2, createdF2, 2, "f2v2");
+        final VersionedFlowSnapshot createdSnapshotF2V2 = client.getFlowSnapshotClient().create(snapshotF2V2);
+        assertNotNull(createdSnapshotF2V2);
+
+        // Verify retrieving flow versions...
+
+        final VersionedFlowSnapshot retrievedSnapshotF1V1 = client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 1);
+        assertNotNull(retrievedSnapshotF1V1);
+        assertNotNull(retrievedSnapshotF1V1.getFlowContents());
+        assertEquals("f1v1", retrievedSnapshotF1V1.getFlowContents().getName());
+
+        final VersionedFlowSnapshot retrievedSnapshotF1V2 = client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 2);
+        assertNotNull(retrievedSnapshotF1V2);
+        assertNotNull(retrievedSnapshotF1V2.getFlowContents());
+        assertEquals("f1v2", retrievedSnapshotF1V2.getFlowContents().getName());
+
+        // Verify deleting a flow...
+
+        client.getFlowClient().delete(createdB1.getIdentifier(), createdF1.getIdentifier());
+
+        // All versions of f1 should be deleted
+        try {
+            client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 1);
+            fail("Should have thrown exception");
+        } catch (NiFiRegistryException nre) {
+        }
+
+        // Versions of f2 should still exist...
+        final VersionedFlowSnapshot retrievedSnapshotF2V1 = client.getFlowSnapshotClient().get(createdF2.getIdentifier(), 1);
+        assertNotNull(retrievedSnapshotF2V1);
+        assertNotNull(retrievedSnapshotF2V1.getFlowContents());
+        assertEquals("f2v1", retrievedSnapshotF2V1.getFlowContents().getName());
+    }
+
+    private VersionedFlowSnapshot createSnapshot(final Bucket bucket, final VersionedFlow flow, final int version, final String rootPgName) {
+        final VersionedProcessGroup rootPg = new VersionedProcessGroup();
+        rootPg.setName(rootPgName);
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setBucketIdentifier(bucket.getIdentifier());
+        snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
+        snapshotMetadata.setVersion(version);
+        snapshotMetadata.setComments("comments");
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setFlowContents(rootPg);
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+        return snapshot;
+    }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties
similarity index 77%
copy from nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
copy to nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties
index e456fa2..91e653c 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,5 +13,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
-org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
\ No newline at end of file
+#
+
+# Integration Test Profile for running an unsecured NiFi Registry instance
+
+# Custom (non-standard to Spring Boot) properties
+nifi.registry.properties.file = src/test/resources/conf/db-flow-storage/nifi-registry.properties
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties
similarity index 66%
copy from nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
copy to nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties
index e456fa2..dd50ae6 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -12,5 +13,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
-org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
\ No newline at end of file
+#
+
+# web properties #
+nifi.registry.web.http.host=localhost
+
+# providers properties #
+nifi.registry.providers.configuration.file=./target/test-classes/conf/providers-db-flow-storage.xml
+
+# extensions working dir #
+nifi.registry.extensions.working.directory=./target/work/extensions
+
+# database properties
+nifi.registry.db.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml
new file mode 100644
index 0000000..2ec132a
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<providers>
+
+    <flowPersistenceProvider>
+        <class>org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider</class>
+    </flowPersistenceProvider>
+
+    <extensionBundlePersistenceProvider>
+        <class>org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider</class>
+        <property name="Extension Bundle Storage Directory">./target/test-classes/extension_bundles</property>
+    </extensionBundlePersistenceProvider>
+
+</providers>
\ No newline at end of file
diff --git a/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java b/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java
index a917c19..a510281 100644
--- a/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java
+++ b/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java
@@ -40,6 +40,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jdbc.core.JdbcTemplate;
 
+import javax.sql.DataSource;
+
 public class FlowPersistenceProviderMigrator {
     private static final Logger log = LoggerFactory.getLogger(FlowPersistenceProviderMigrator.class);
     public static final int PARSE_EXCEPTION = 1;
@@ -81,9 +83,10 @@ public class FlowPersistenceProviderMigrator {
 
         NiFiRegistryProperties fromProperties = NiFiRegistry.initializeProperties(NiFiRegistry.getMasterKeyProvider());
 
-        DatabaseMetadataService fromMetadataService = new DatabaseMetadataService(new JdbcTemplate(new DataSourceFactory(fromProperties).getDataSource()));
-        FlowPersistenceProvider fromPersistenceProvider = createFlowPersistenceProvider(fromProperties);
-        FlowPersistenceProvider toPersistenceProvider = createFlowPersistenceProvider(createToProperties(commandLine, fromProperties));
+        DataSource dataSource = new DataSourceFactory(fromProperties).getDataSource();
+        DatabaseMetadataService fromMetadataService = new DatabaseMetadataService(new JdbcTemplate(dataSource));
+        FlowPersistenceProvider fromPersistenceProvider = createFlowPersistenceProvider(fromProperties, dataSource);
+        FlowPersistenceProvider toPersistenceProvider = createFlowPersistenceProvider(createToProperties(commandLine, fromProperties), dataSource);
 
         new FlowPersistenceProviderMigrator().doMigrate(fromMetadataService, fromPersistenceProvider, toPersistenceProvider);
     }
@@ -97,10 +100,10 @@ public class FlowPersistenceProviderMigrator {
         return toProperties;
     }
 
-    private static FlowPersistenceProvider createFlowPersistenceProvider(NiFiRegistryProperties niFiRegistryProperties) {
+    private static FlowPersistenceProvider createFlowPersistenceProvider(NiFiRegistryProperties niFiRegistryProperties, DataSource dataSource) {
         ExtensionManager fromExtensionManager = new ExtensionManager(niFiRegistryProperties);
         fromExtensionManager.discoverExtensions();
-        StandardProviderFactory fromProviderFactory = new StandardProviderFactory(niFiRegistryProperties, fromExtensionManager);
+        StandardProviderFactory fromProviderFactory = new StandardProviderFactory(niFiRegistryProperties, fromExtensionManager, dataSource);
         fromProviderFactory.initialize();
         return fromProviderFactory.getFlowPersistenceProvider();
     }