You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2019/08/05 18:34:48 UTC
[nifi-registry] branch master updated: NIFIREG-285 - Add
DatabaseFlowPersistenceProvider
This is an automated email from the ASF dual-hosted git repository.
kdoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-registry.git
The following commit(s) were added to refs/heads/master by this push:
new 6f19a73 NIFIREG-285 - Add DatabaseFlowPersistenceProvider
6f19a73 is described below
commit 6f19a736c00242affb82a0d326e686be5a2e1b50
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Mon Jul 15 11:49:21 2019 -0400
NIFIREG-285 - Add DatabaseFlowPersistenceProvider
- Added method injection of DataSource in StandardProviderFactory
- Split out postgres migrations to use BYTEA for blob type
This closes #205.
Signed-off-by: Kevin Doran <kd...@apache.org>
---
.../src/main/asciidoc/administration-guide.adoc | 23 ++-
.../registry/db/CustomFlywayConfiguration.java | 7 +
.../registry/provider/StandardProviderFactory.java | 52 +++++-
.../flow/DatabaseFlowPersistenceProvider.java | 86 ++++++++++
...ache.nifi.registry.flow.FlowPersistenceProvider | 3 +-
.../migration/default/V6__AddFlowPersistence.sql | 22 +++
.../db/migration/mysql/V6__AddFlowPersistence.sql | 22 +++
.../db/migration/postgres/V2__Initial.sql | 60 +++++++
.../db/migration/postgres/V3__AddExtensions.sql | 105 ++++++++++++
.../migration/postgres/V4__AddCascadeOnDelete.sql | 23 +++
.../postgres/V5__AddBucketPublicFlags.sql | 16 ++
.../migration/postgres/V6__AddFlowPersistence.sql | 22 +++
.../provider/TestStandardProviderFactory.java | 18 ++-
.../flow/TestDatabaseFlowPersistenceProvider.java | 100 ++++++++++++
.../provider/hook/TestScriptEventHookProvider.java | 12 +-
.../nifi/registry/provider/ProviderContext.java | 34 ++++
.../src/main/resources/conf/providers.xml | 6 +
.../nifi/registry/web/api/DBFlowStorageIT.java | 178 +++++++++++++++++++++
.../application-ITDBFlowStorage.properties} | 9 +-
.../conf/db-flow-storage/nifi-registry.properties} | 16 +-
.../resources/conf/providers-db-flow-storage.xml | 29 ++++
.../FlowPersistenceProviderMigrator.java | 13 +-
22 files changed, 832 insertions(+), 24 deletions(-)
diff --git a/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc b/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc
index 29b5589..a28dc50 100644
--- a/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-registry-core/nifi-registry-docs/src/main/asciidoc/administration-guide.adoc
@@ -1118,7 +1118,7 @@ Currently, NiFi Registry supports using H2, Postgres 9.x, and MySQL (5.6, 5.7, 8
NOTE: NiFi Registry 0.1.0 only supports H2.
-== H2
+=== H2
H2 is an embedded database that is pre-configured in the default _nifi-registry.properties_ file. The contents of the H2 database are stored in a file on the local filesystem.
@@ -1130,7 +1130,7 @@ For NiFi Registry 0.2.0 and forward, the location of the H2 database is specifie
`nifi.registry.db.url=jdbc:h2:./database/nifi-registry-primary;`
-== Postgres
+=== Postgres
Postgres provides the option to use an externally located database that also supports high availability.
@@ -1159,7 +1159,7 @@ The following steps are required to use Postgres:
nifi.registry.db.username=nifireg
nifi.registry.db.password=changeme
-== MySQL
+=== MySQL
MySQL also provides the option to use an externally located database that also supports high availability.
@@ -1357,6 +1357,23 @@ Host bitbucket.org
IdentityFile ~/.ssh/key-for-bitbucket
....
+==== DatabaseFlowPersistenceProvider
+
+`DatabaseFlowPersistenceProvider` stores flow contents in a database table.
+
+This provider leverages the same database used for the metadata database, so there is no configuration to provide since the
+connection details will come from the database properties in `nifi-registry.properties`.
+
+The database table is named `FLOW_PERSISTENCE_PROVIDER` and has the following schema:
+
+|====
+|*Column*|*Description*
+|BUCKET_ID|The identifier of the bucket where the flow is located.
+|FLOW_ID|The identifier of the flow.
+|VERSION|The version of the flow.
+|FLOW_CONTENT|The serialized bytes of the flow content stored as a BLOB.
+|====
+
==== Switching from other Flow Persistence Provider
In order to switch the Flow Persistence Provider, it is necessary to reset NiFi Registry.
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java
index 0cbf64f..0288f9d 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/CustomFlywayConfiguration.java
@@ -41,6 +41,9 @@ public class CustomFlywayConfiguration implements FlywayConfigurationCustomizer
private static final String LOCATION_MYSQL = "classpath:db/migration/mysql";
private static final String[] LOCATIONS_MYSQL = {LOCATION_COMMON, LOCATION_MYSQL};
+ private static final String LOCATION_POSTGRES = "classpath:db/migration/postgres";
+ private static final String[] LOCATIONS_POSTGRES = {LOCATION_COMMON, LOCATION_POSTGRES};
+
@Override
public void customize(final FluentConfiguration configuration) {
final DatabaseType databaseType = getDatabaseType(configuration.getDataSource());
@@ -51,6 +54,10 @@ public class CustomFlywayConfiguration implements FlywayConfigurationCustomizer
LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_MYSQL});
configuration.locations(LOCATIONS_MYSQL);
break;
+ case POSTGRESQL:
+ LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_POSTGRES});
+ configuration.locations(LOCATIONS_POSTGRES);
+ break;
default:
LOGGER.info("Setting migration locations to {}", new Object[] {LOCATIONS_DEFAULT});
configuration.locations(LOCATIONS_DEFAULT);
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
index acd7705..c9eb9f5 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -32,6 +32,7 @@ import org.springframework.context.annotation.Configuration;
import org.xml.sax.SAXException;
import javax.annotation.PostConstruct;
+import javax.sql.DataSource;
import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
@@ -42,6 +43,8 @@ import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import java.io.File;
import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -73,6 +76,7 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
private final NiFiRegistryProperties properties;
private final ExtensionManager extensionManager;
+ private final DataSource dataSource;
private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null);
private FlowPersistenceProvider flowPersistenceProvider;
@@ -80,9 +84,10 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
private BundlePersistenceProvider bundlePersistenceProvider;
@Autowired
- public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) {
+ public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager, final DataSource dataSource) {
this.properties = properties;
this.extensionManager = extensionManager;
+ this.dataSource = dataSource;
if (this.properties == null) {
throw new IllegalStateException("NiFiRegistryProperties cannot be null");
@@ -91,6 +96,10 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
if (this.extensionManager == null) {
throw new IllegalStateException("ExtensionManager cannot be null");
}
+
+ if (this.dataSource == null) {
+ throw new IllegalStateException("DataSource cannot be null");
+ }
}
@PostConstruct
@@ -144,14 +153,16 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
final Constructor constructor = flowProviderClass.getConstructor();
flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance();
- LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+ performMethodInjection(flowPersistenceProvider, flowProviderClass);
+
+ LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[]{flowProviderClassName});
} catch (Exception e) {
throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e);
}
final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty());
flowPersistenceProvider.onConfigured(configurationContext);
- LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+ LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[]{flowProviderClassName});
}
return flowPersistenceProvider;
@@ -192,6 +203,8 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
final Constructor constructor = hookProviderClass.getConstructor();
hook = (EventHookProvider) constructor.newInstance();
+ performMethodInjection(hook, hookProviderClass);
+
LOGGER.info("Instantiated EventHookProvider with class name {}", new Object[] {hookProviderClassName});
} catch (Exception e) {
throw new ProviderFactoryException("Error creating EventHookProvider with class name: " + hookProviderClassName, e);
@@ -233,6 +246,8 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
final Constructor constructor = extensionBundleProviderClass.getConstructor();
bundlePersistenceProvider = (BundlePersistenceProvider) constructor.newInstance();
+ performMethodInjection(bundlePersistenceProvider, extensionBundleProviderClass);
+
LOGGER.info("Instantiated BundlePersistenceProvider with class name {}", new Object[] {extensionBundleProviderClassName});
} catch (Exception e) {
throw new ProviderFactoryException("Error creating BundlePersistenceProvider with class name: " + extensionBundleProviderClassName, e);
@@ -273,4 +288,35 @@ public class StandardProviderFactory implements ProviderFactory, DisposableBean
return new StandardProviderConfigurationContext(properties);
}
+ private void performMethodInjection(final Object instance, final Class providerClass) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+ for (final Method method : providerClass.getMethods()) {
+ if (method.isAnnotationPresent(ProviderContext.class)) {
+ // make the method accessible
+ final boolean isAccessible = method.isAccessible();
+ method.setAccessible(true);
+
+ try {
+ final Class<?>[] argumentTypes = method.getParameterTypes();
+
+ // look for setters (single argument)
+ if (argumentTypes.length == 1) {
+ final Class<?> argumentType = argumentTypes[0];
+
+ // look for well known types, currently we only support injecting the DataSource
+ if (DataSource.class.isAssignableFrom(argumentType)) {
+ method.invoke(instance, dataSource);
+ }
+ }
+ } finally {
+ method.setAccessible(isAccessible);
+ }
+ }
+ }
+
+ final Class parentClass = providerClass.getSuperclass();
+ if (parentClass != null && Provider.class.isAssignableFrom(parentClass)) {
+ performMethodInjection(instance, parentClass);
+ }
+ }
+
}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java
new file mode 100644
index 0000000..ee38592
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/DatabaseFlowPersistenceProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import javax.sql.DataSource;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A FlowPersistenceProvider that uses a database table for storage. The intent is to use the same database as the rest
+ * of the application so that all data can be stored together and benefit from any replication/scaling of the database.
+ */
+public class DatabaseFlowPersistenceProvider implements FlowPersistenceProvider {
+
+ private DataSource dataSource;
+ private JdbcTemplate jdbcTemplate;
+
+ @ProviderContext
+ public void setDataSource(final DataSource dataSource) {
+ this.dataSource = dataSource;
+ this.jdbcTemplate = new JdbcTemplate(this.dataSource);
+ }
+
+ @Override
+ public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ // there is no config since we get the DataSource from the framework
+ }
+
+ @Override
+ public void saveFlowContent(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
+ final String sql = "INSERT INTO FLOW_PERSISTENCE_PROVIDER (BUCKET_ID, FLOW_ID, VERSION, FLOW_CONTENT) VALUES (?, ?, ?, ?)";
+ jdbcTemplate.update(sql, context.getBucketId(), context.getFlowId(), context.getVersion(), content);
+ }
+
+ @Override
+ public byte[] getFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+ final List<byte[]> results = new ArrayList<>();
+ final String sql = "SELECT FLOW_CONTENT FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ? and VERSION = ?";
+
+ jdbcTemplate.query(sql, new Object[] {bucketId, flowId, version}, (rs) -> {
+ final byte[] content = rs.getBytes("FLOW_CONTENT");
+ results.add(content);
+ });
+
+ if (results.isEmpty()) {
+ return null;
+ } else {
+ return results.get(0);
+ }
+ }
+
+ @Override
+ public void deleteAllFlowContent(final String bucketId, final String flowId) throws FlowPersistenceException {
+ final String sql = "DELETE FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ?";
+ jdbcTemplate.update(sql, bucketId, flowId);
+ }
+
+ @Override
+ public void deleteFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+ final String sql = "DELETE FROM FLOW_PERSISTENCE_PROVIDER WHERE BUCKET_ID = ? and FLOW_ID = ? and VERSION = ?";
+ jdbcTemplate.update(sql, bucketId, flowId, version);
+ }
+
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
index e456fa2..df57a73 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
-org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
\ No newline at end of file
+org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
+org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql
new file mode 100644
index 0000000..0765b70
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/default/V6__AddFlowPersistence.sql
@@ -0,0 +1,22 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE FLOW_PERSISTENCE_PROVIDER (
+ BUCKET_ID VARCHAR(50) NOT NULL,
+ FLOW_ID VARCHAR(50) NOT NULL,
+ VERSION INT NOT NULL,
+ FLOW_CONTENT BLOB NOT NULL,
+ CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION)
+);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql
new file mode 100644
index 0000000..ad3d53f
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/mysql/V6__AddFlowPersistence.sql
@@ -0,0 +1,22 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE FLOW_PERSISTENCE_PROVIDER (
+ BUCKET_ID VARCHAR(50) NOT NULL,
+ FLOW_ID VARCHAR(50) NOT NULL,
+ VERSION INT NOT NULL,
+ FLOW_CONTENT LONGBLOB NOT NULL,
+ CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION)
+);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql
new file mode 100644
index 0000000..b992d23
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V2__Initial.sql
@@ -0,0 +1,60 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- The NAME column has a max size of 768 because this is the largest size that MySQL allows when using a unique constraint.
+CREATE TABLE BUCKET (
+ ID VARCHAR(50) NOT NULL,
+ NAME VARCHAR(1000) NOT NULL,
+ DESCRIPTION TEXT,
+ CREATED TIMESTAMP NOT NULL,
+ CONSTRAINT PK__BUCKET_ID PRIMARY KEY (ID),
+ CONSTRAINT UNIQUE__BUCKET_NAME UNIQUE (NAME)
+);
+
+CREATE TABLE BUCKET_ITEM (
+ ID VARCHAR(50) NOT NULL,
+ NAME VARCHAR(1000) NOT NULL,
+ DESCRIPTION TEXT,
+ CREATED TIMESTAMP NOT NULL,
+ MODIFIED TIMESTAMP NOT NULL,
+ ITEM_TYPE VARCHAR(50) NOT NULL,
+ BUCKET_ID VARCHAR(50) NOT NULL,
+ CONSTRAINT PK__BUCKET_ITEM_ID PRIMARY KEY (ID),
+ CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID)
+);
+
+CREATE TABLE FLOW (
+ ID VARCHAR(50) NOT NULL,
+ CONSTRAINT PK__FLOW_ID PRIMARY KEY (ID),
+ CONSTRAINT FK__FLOW_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID)
+);
+
+CREATE TABLE FLOW_SNAPSHOT (
+ FLOW_ID VARCHAR(50) NOT NULL,
+ VERSION INT NOT NULL,
+ CREATED TIMESTAMP NOT NULL,
+ CREATED_BY VARCHAR(4096) NOT NULL,
+ COMMENTS TEXT,
+ CONSTRAINT PK__FLOW_SNAPSHOT_FLOW_ID_AND_VERSION PRIMARY KEY (FLOW_ID, VERSION),
+ CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID)
+);
+
+CREATE TABLE SIGNING_KEY (
+ ID VARCHAR(50) NOT NULL,
+ TENANT_IDENTITY VARCHAR(4096) NOT NULL,
+ KEY_VALUE VARCHAR(50) NOT NULL,
+ CONSTRAINT PK__SIGNING_KEY_ID PRIMARY KEY (ID),
+ CONSTRAINT UNIQUE__SIGNING_KEY_TENANT_IDENTITY UNIQUE (TENANT_IDENTITY)
+);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql
new file mode 100644
index 0000000..3bd9820
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V3__AddExtensions.sql
@@ -0,0 +1,105 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE BUNDLE (
+ ID VARCHAR(50) NOT NULL,
+ BUCKET_ID VARCHAR(50) NOT NULL,
+ BUNDLE_TYPE VARCHAR(200) NOT NULL,
+ GROUP_ID VARCHAR(500) NOT NULL,
+ ARTIFACT_ID VARCHAR(500) NOT NULL,
+ CONSTRAINT PK__EXTENSION_BUNDLE_ID PRIMARY KEY (ID),
+ CONSTRAINT FK__EXTENSION_BUNDLE_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID) ON DELETE CASCADE,
+ CONSTRAINT FK__EXTENSION_BUNDLE_BUCKET_ID FOREIGN KEY(BUCKET_ID) REFERENCES BUCKET(ID) ON DELETE CASCADE,
+ CONSTRAINT UNIQUE__EXTENSION_BUNDLE_BUCKET_GROUP_ARTIFACT UNIQUE (BUCKET_ID, GROUP_ID, ARTIFACT_ID)
+);
+
+CREATE TABLE BUNDLE_VERSION (
+ ID VARCHAR(50) NOT NULL,
+ BUNDLE_ID VARCHAR(50) NOT NULL,
+ VERSION VARCHAR(100) NOT NULL,
+ CREATED TIMESTAMP NOT NULL,
+ CREATED_BY VARCHAR(4096) NOT NULL,
+ DESCRIPTION TEXT,
+ SHA_256_HEX VARCHAR(512) NOT NULL,
+ SHA_256_SUPPLIED INT NOT NULL,
+ CONTENT_SIZE BIGINT NOT NULL,
+ SYSTEM_API_VERSION VARCHAR(50),
+ BUILD_TOOL VARCHAR(100),
+ BUILD_FLAGS VARCHAR(100),
+ BUILD_BRANCH VARCHAR(200),
+ BUILD_TAG VARCHAR(200),
+ BUILD_REVISION VARCHAR(100),
+ BUILT TIMESTAMP,
+ BUILT_BY VARCHAR(4096),
+ CONSTRAINT PK__BUNDLE_VERSION_ID PRIMARY KEY (ID),
+ CONSTRAINT FK__BUNDLE_VERSION_BUNDLE_ID FOREIGN KEY (BUNDLE_ID) REFERENCES BUNDLE(ID) ON DELETE CASCADE,
+ CONSTRAINT UNIQUE__BUNDLE_VERSION_BUNDLE_ID_VERSION UNIQUE (BUNDLE_ID, VERSION)
+);
+
+CREATE TABLE BUNDLE_VERSION_DEPENDENCY (
+ ID VARCHAR(50) NOT NULL,
+ BUNDLE_VERSION_ID VARCHAR(50) NOT NULL,
+ GROUP_ID VARCHAR(500) NOT NULL,
+ ARTIFACT_ID VARCHAR(500) NOT NULL,
+ VERSION VARCHAR(100) NOT NULL,
+ CONSTRAINT PK__BUNDLE_VERSION_DEPENDENCY_ID PRIMARY KEY (ID),
+ CONSTRAINT FK__BUNDLE_VERSION_DEPENDENCY_BUNDLE_VERSION_ID FOREIGN KEY (BUNDLE_VERSION_ID) REFERENCES BUNDLE_VERSION(ID) ON DELETE CASCADE,
+ CONSTRAINT UNIQUE__BUNDLE_VERSION_DEPENDENCY_BUNDLE_ID_GROUP_ARTIFACT_VERSION UNIQUE (BUNDLE_VERSION_ID, GROUP_ID, ARTIFACT_ID, VERSION)
+);
+
+CREATE TABLE EXTENSION (
+ ID VARCHAR(50) NOT NULL,
+ BUNDLE_VERSION_ID VARCHAR(50) NOT NULL,
+ NAME VARCHAR(500) NOT NULL,
+ DISPLAY_NAME VARCHAR(500) NOT NULL,
+ TYPE VARCHAR(100) NOT NULL,
+ CONTENT TEXT NOT NULL,
+ ADDITIONAL_DETAILS TEXT,
+ HAS_ADDITIONAL_DETAILS INT NOT NULL,
+ CONSTRAINT PK__EXTENSION_ID PRIMARY KEY (ID),
+ CONSTRAINT FK__EXTENSION_BUNDLE_VERSION_ID FOREIGN KEY (BUNDLE_VERSION_ID) REFERENCES BUNDLE_VERSION(ID) ON DELETE CASCADE,
+ CONSTRAINT UNIQUE__EXTENSION_BUNDLE_VERSION_ID_AND_NAME UNIQUE (BUNDLE_VERSION_ID, NAME)
+);
+
+CREATE TABLE EXTENSION_PROVIDED_SERVICE_API (
+ ID VARCHAR(50) NOT NULL,
+ EXTENSION_ID VARCHAR(50) NOT NULL,
+ CLASS_NAME VARCHAR (500) NOT NULL,
+ GROUP_ID VARCHAR(500) NOT NULL,
+ ARTIFACT_ID VARCHAR(500) NOT NULL,
+ VERSION VARCHAR(100) NOT NULL,
+ CONSTRAINT PK__EXTENSION_PROVIDED_SERVICE_API_ID PRIMARY KEY (ID),
+ CONSTRAINT FK__EXTENSION_PROVIDED_SERVICE_API_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE,
+ CONSTRAINT UNIQUE__EXTENSION_PROVIDED_SERVICE_API UNIQUE (EXTENSION_ID, CLASS_NAME, GROUP_ID, ARTIFACT_ID, VERSION)
+);
+
+CREATE TABLE EXTENSION_RESTRICTION (
+ ID VARCHAR(50) NOT NULL,
+ EXTENSION_ID VARCHAR(50) NOT NULL,
+ REQUIRED_PERMISSION VARCHAR(200) NOT NULL,
+ EXPLANATION VARCHAR (4096) NOT NULL,
+ CONSTRAINT PK__EXTENSION_RESTRICTION_ID PRIMARY KEY (ID),
+ CONSTRAINT FK__EXTENSION_RESTRICTION_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE,
+ CONSTRAINT UNIQUE__EXTENSION_RESTRICTION_EXTENSION_ID_REQUIRED_PERMISSION UNIQUE (EXTENSION_ID, REQUIRED_PERMISSION)
+);
+
+CREATE TABLE EXTENSION_TAG (
+ EXTENSION_ID VARCHAR(50) NOT NULL,
+ TAG VARCHAR(200) NOT NULL,
+ CONSTRAINT PK__EXTENSION_TAG_EXTENSION_ID_AND_TAG PRIMARY KEY (EXTENSION_ID, TAG),
+ CONSTRAINT FK__EXTENSION_TAG_EXTENSION_ID FOREIGN KEY (EXTENSION_ID) REFERENCES EXTENSION(ID) ON DELETE CASCADE
+);
+
+ALTER TABLE BUCKET ADD ALLOW_EXTENSION_BUNDLE_REDEPLOY INT NOT NULL DEFAULT (0);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql
new file mode 100644
index 0000000..5b0e6c6
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V4__AddCascadeOnDelete.sql
@@ -0,0 +1,23 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+ALTER TABLE BUCKET_ITEM DROP CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID;
+ALTER TABLE BUCKET_ITEM ADD CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID) ON DELETE CASCADE;
+
+ALTER TABLE FLOW DROP CONSTRAINT FK__FLOW_BUCKET_ITEM_ID;
+ALTER TABLE FLOW ADD CONSTRAINT FK__FLOW_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID) ON DELETE CASCADE;
+
+ALTER TABLE FLOW_SNAPSHOT DROP CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID;
+ALTER TABLE FLOW_SNAPSHOT ADD CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID) ON DELETE CASCADE;
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql
new file mode 100644
index 0000000..ef7478b
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V5__AddBucketPublicFlags.sql
@@ -0,0 +1,16 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+ALTER TABLE BUCKET ADD ALLOW_PUBLIC_READ INT NOT NULL DEFAULT (0);
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql
new file mode 100644
index 0000000..395760b
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/postgres/V6__AddFlowPersistence.sql
@@ -0,0 +1,22 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE FLOW_PERSISTENCE_PROVIDER (
+ BUCKET_ID VARCHAR(50) NOT NULL,
+ FLOW_ID VARCHAR(50) NOT NULL,
+ VERSION INT NOT NULL,
+ FLOW_CONTENT BYTEA NOT NULL,
+ CONSTRAINT PK__FLOW_PERSISTENCE_PROVIDER PRIMARY KEY (BUCKET_ID, FLOW_ID, VERSION)
+);
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
index ce374f7..0d23110 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/TestStandardProviderFactory.java
@@ -23,6 +23,8 @@ import org.apache.nifi.registry.properties.NiFiRegistryProperties;
import org.junit.Test;
import org.mockito.Mockito;
+import javax.sql.DataSource;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.any;
@@ -38,7 +40,9 @@ public class TestStandardProviderFactory {
final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
- final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+ final DataSource dataSource = Mockito.mock(DataSource.class);
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
providerFactory.initialize();
final FlowPersistenceProvider flowPersistenceProvider = providerFactory.getFlowPersistenceProvider();
@@ -66,7 +70,9 @@ public class TestStandardProviderFactory {
final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
- final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+ final DataSource dataSource = Mockito.mock(DataSource.class);
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
providerFactory.getFlowPersistenceProvider();
}
@@ -78,7 +84,9 @@ public class TestStandardProviderFactory {
final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
- final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+ final DataSource dataSource = Mockito.mock(DataSource.class);
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
providerFactory.initialize();
}
@@ -90,7 +98,9 @@ public class TestStandardProviderFactory {
final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
- final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+ final DataSource dataSource = Mockito.mock(DataSource.class);
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
providerFactory.initialize();
providerFactory.getFlowPersistenceProvider();
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java
new file mode 100644
index 0000000..5314860
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/flow/TestDatabaseFlowPersistenceProvider.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.nifi.registry.db.DatabaseTestApplication;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.TestExecutionListeners;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
+import org.springframework.test.context.transaction.TransactionalTestExecutionListener;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.sql.DataSource;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.when;
+
+@Transactional
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = DatabaseTestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
+@TestExecutionListeners({DependencyInjectionTestExecutionListener.class, TransactionalTestExecutionListener.class})
+public class TestDatabaseFlowPersistenceProvider {
+
+ @Autowired
+ private DataSource dataSource;
+
+ private FlowPersistenceProvider persistenceProvider;
+
+ @Before
+ public void setup() {
+ persistenceProvider = new DatabaseFlowPersistenceProvider();
+ ((DatabaseFlowPersistenceProvider)persistenceProvider).setDataSource(dataSource);
+ }
+
+ @Test
+ public void testAll() {
+ // Save two versions of a flow...
+ final FlowSnapshotContext context1 = getFlowSnapshotContext("b1", "f1", 1);
+ final byte[] content1 = "f1v1".getBytes(StandardCharsets.UTF_8);
+ persistenceProvider.saveFlowContent(context1, content1);
+
+ final FlowSnapshotContext context2 = getFlowSnapshotContext("b1", "f1", 2);
+ final byte[] content2 = "f1v2".getBytes(StandardCharsets.UTF_8);
+ persistenceProvider.saveFlowContent(context2, content2);
+
+ // Verify we can retrieve both versions and that the content is correct
+ final byte[] retrievedContent1 = persistenceProvider.getFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion());
+ assertNotNull(retrievedContent1);
+ assertEquals("f1v1", new String(retrievedContent1, StandardCharsets.UTF_8));
+
+ final byte[] retrievedContent2 = persistenceProvider.getFlowContent(context2.getBucketId(), context2.getFlowId(), context2.getVersion());
+ assertNotNull(retrievedContent2);
+ assertEquals("f1v2", new String(retrievedContent2, StandardCharsets.UTF_8));
+
+ // Delete a specific version and verify we can longer retrieve it
+ persistenceProvider.deleteFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion());
+
+ final byte[] deletedContent1 = persistenceProvider.getFlowContent(context1.getBucketId(), context1.getFlowId(), context1.getVersion());
+ assertNull(deletedContent1);
+
+ // Delete all content for a flow
+ persistenceProvider.deleteAllFlowContent(context1.getBucketId(), context1.getFlowId());
+
+ final byte[] deletedContent2 = persistenceProvider.getFlowContent(context2.getBucketId(), context2.getFlowId(), context2.getVersion());
+ assertNull(deletedContent2);
+ }
+
+ private FlowSnapshotContext getFlowSnapshotContext(final String bucketId, final String flowId, final int version) {
+ final FlowSnapshotContext context = Mockito.mock(FlowSnapshotContext.class);
+ when(context.getBucketId()).thenReturn(bucketId);
+ when(context.getFlowId()).thenReturn(flowId);
+ when(context.getVersion()).thenReturn(version);
+ return context;
+ }
+
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java
index ab24998..b7d241b 100644
--- a/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java
+++ b/nifi-registry-core/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/hook/TestScriptEventHookProvider.java
@@ -16,9 +16,6 @@
*/
package org.apache.nifi.registry.provider.hook;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.when;
-
import org.apache.nifi.registry.extension.ExtensionManager;
import org.apache.nifi.registry.properties.NiFiRegistryProperties;
import org.apache.nifi.registry.provider.ProviderCreationException;
@@ -27,6 +24,11 @@ import org.apache.nifi.registry.provider.StandardProviderFactory;
import org.junit.Test;
import org.mockito.Mockito;
+import javax.sql.DataSource;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
public class TestScriptEventHookProvider {
@Test(expected = ProviderCreationException.class)
@@ -37,7 +39,9 @@ public class TestScriptEventHookProvider {
final ExtensionManager extensionManager = Mockito.mock(ExtensionManager.class);
when(extensionManager.getExtensionClassLoader(any(String.class))).thenReturn(this.getClass().getClassLoader());
- final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager);
+ final DataSource dataSource = Mockito.mock(DataSource.class);
+
+ final ProviderFactory providerFactory = new StandardProviderFactory(props, extensionManager, dataSource);
providerFactory.initialize();
providerFactory.getEventHookProviders();
}
diff --git a/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java
new file mode 100644
index 0000000..9729176
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/provider/ProviderContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation for setter methods in a provider to indicate the framework should inject the requested resource.
+ */
+@Documented
+@Target({ElementType.FIELD, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface ProviderContext {
+}
diff --git a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
index f41eee8..3eb138f 100644
--- a/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
+++ b/nifi-registry-core/nifi-registry-resources/src/main/resources/conf/providers.xml
@@ -37,6 +37,12 @@
-->
<!--
+ <flowPersistenceProvider>
+ <class>org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider</class>
+ </flowPersistenceProvider>
+ -->
+
+ <!--
<eventHookProvider>
<class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class>
<property name="Script Path"></property>
diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java
new file mode 100644
index 0000000..b438eaa
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/api/DBFlowStorageIT.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.web.api;
+
+import org.apache.nifi.registry.NiFiRegistryTestApiApplication;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryClientConfig;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.client.impl.JerseyNiFiRegistryClient;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.jdbc.Sql;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(
+ classes = NiFiRegistryTestApiApplication.class,
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
+ properties = "spring.profiles.include=ITDBFlowStorage")
+@Sql(executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD, scripts = "classpath:db/clearDB.sql")
+public class DBFlowStorageIT extends IntegrationTestBase {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(UnsecuredNiFiRegistryClientIT.class);
+
+ private NiFiRegistryClient client;
+
+ @Before
+ public void setup() throws IOException {
+ final String baseUrl = createBaseURL();
+ LOGGER.info("Using base url = " + baseUrl);
+
+ final NiFiRegistryClientConfig clientConfig = new NiFiRegistryClientConfig.Builder()
+ .baseUrl(baseUrl)
+ .build();
+ assertNotNull(clientConfig);
+
+ final NiFiRegistryClient client = new JerseyNiFiRegistryClient.Builder()
+ .config(clientConfig)
+ .build();
+ assertNotNull(client);
+ this.client = client;
+ }
+
+ @After
+ public void teardown() {
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOGGER.warn(e.getMessage(), e);
+ }
+ }
+
+ @Test
+ public void testAll() throws IOException, NiFiRegistryException {
+ // Create two buckets...
+
+ final Bucket b1 = new Bucket();
+ b1.setName("b1");
+
+ final Bucket createdB1 = client.getBucketClient().create(b1);
+ assertNotNull(createdB1);
+
+ final Bucket b2 = new Bucket();
+ b2.setName("b2");
+
+ final Bucket createdB2 = client.getBucketClient().create(b2);
+ assertNotNull(createdB2);
+
+ // Create two flows...
+
+ final VersionedFlow f1 = new VersionedFlow();
+ f1.setName("f1");
+ f1.setBucketIdentifier(createdB1.getIdentifier());
+
+ final VersionedFlow createdF1 = client.getFlowClient().create(f1);
+ assertNotNull(createdF1);
+
+ final VersionedFlow f2 = new VersionedFlow();
+ f2.setName("f2");
+ f2.setBucketIdentifier(createdB2.getIdentifier());
+
+ final VersionedFlow createdF2 = client.getFlowClient().create(f2);
+ assertNotNull(createdF2);
+
+ // Create some versions for each flow...
+
+ final VersionedFlowSnapshot snapshotF1V1 = createSnapshot(createdB1, createdF1, 1, "f1v1");
+ final VersionedFlowSnapshot createdSnapshotF1V1 = client.getFlowSnapshotClient().create(snapshotF1V1);
+ assertNotNull(createdSnapshotF1V1);
+
+ final VersionedFlowSnapshot snapshotF1V2 = createSnapshot(createdB1, createdF1, 2, "f1v2");
+ final VersionedFlowSnapshot createdSnapshotF1V2 = client.getFlowSnapshotClient().create(snapshotF1V2);
+ assertNotNull(createdSnapshotF1V2);
+
+ final VersionedFlowSnapshot snapshotF2V1 = createSnapshot(createdB2, createdF2, 1, "f2v1");
+ final VersionedFlowSnapshot createdSnapshotF2V1 = client.getFlowSnapshotClient().create(snapshotF2V1);
+ assertNotNull(createdSnapshotF2V1);
+
+ final VersionedFlowSnapshot snapshotF2V2 = createSnapshot(createdB2, createdF2, 2, "f2v2");
+ final VersionedFlowSnapshot createdSnapshotF2V2 = client.getFlowSnapshotClient().create(snapshotF2V2);
+ assertNotNull(createdSnapshotF2V2);
+
+ // Verify retrieving flow versions...
+
+ final VersionedFlowSnapshot retrievedSnapshotF1V1 = client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 1);
+ assertNotNull(retrievedSnapshotF1V1);
+ assertNotNull(retrievedSnapshotF1V1.getFlowContents());
+ assertEquals("f1v1", retrievedSnapshotF1V1.getFlowContents().getName());
+
+ final VersionedFlowSnapshot retrievedSnapshotF1V2 = client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 2);
+ assertNotNull(retrievedSnapshotF1V2);
+ assertNotNull(retrievedSnapshotF1V2.getFlowContents());
+ assertEquals("f1v2", retrievedSnapshotF1V2.getFlowContents().getName());
+
+ // Verify deleting a flow...
+
+ client.getFlowClient().delete(createdB1.getIdentifier(), createdF1.getIdentifier());
+
+ // All versions of f1 should be deleted
+ try {
+ client.getFlowSnapshotClient().get(createdF1.getIdentifier(), 1);
+ fail("Should have thrown exception");
+ } catch (NiFiRegistryException nre) {
+ }
+
+ // Versions of f2 should still exist...
+ final VersionedFlowSnapshot retrievedSnapshotF2V1 = client.getFlowSnapshotClient().get(createdF2.getIdentifier(), 1);
+ assertNotNull(retrievedSnapshotF2V1);
+ assertNotNull(retrievedSnapshotF2V1.getFlowContents());
+ assertEquals("f2v1", retrievedSnapshotF2V1.getFlowContents().getName());
+ }
+
+ private VersionedFlowSnapshot createSnapshot(final Bucket bucket, final VersionedFlow flow, final int version, final String rootPgName) {
+ final VersionedProcessGroup rootPg = new VersionedProcessGroup();
+ rootPg.setName(rootPgName);
+
+ final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+ snapshotMetadata.setBucketIdentifier(bucket.getIdentifier());
+ snapshotMetadata.setFlowIdentifier(flow.getIdentifier());
+ snapshotMetadata.setVersion(version);
+ snapshotMetadata.setComments("comments");
+
+ final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+ snapshot.setFlowContents(rootPg);
+ snapshot.setSnapshotMetadata(snapshotMetadata);
+ return snapshot;
+ }
+}
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties
similarity index 77%
copy from nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
copy to nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties
index e456fa2..91e653c 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/application-ITDBFlowStorage.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -12,5 +13,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
-org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
\ No newline at end of file
+#
+
+# Integration Test Profile for running an unsecured NiFi Registry instance
+
+# Custom (non-standard to Spring Boot) properties
+nifi.registry.properties.file = src/test/resources/conf/db-flow-storage/nifi-registry.properties
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties
similarity index 66%
copy from nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
copy to nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties
index e456fa2..dd50ae6 100644
--- a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/db-flow-storage/nifi-registry.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
@@ -12,5 +13,16 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
-org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
\ No newline at end of file
+#
+
+# web properties #
+nifi.registry.web.http.host=localhost
+
+# providers properties #
+nifi.registry.providers.configuration.file=./target/test-classes/conf/providers-db-flow-storage.xml
+
+# extensions working dir #
+nifi.registry.extensions.working.directory=./target/work/extensions
+
+# database properties
+nifi.registry.db.url.append=;LOCK_TIMEOUT=25000;WRITE_DELAY=0;AUTO_SERVER=FALSE
\ No newline at end of file
diff --git a/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml
new file mode 100644
index 0000000..2ec132a
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-web-api/src/test/resources/conf/providers-db-flow-storage.xml
@@ -0,0 +1,29 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<providers>
+
+ <flowPersistenceProvider>
+ <class>org.apache.nifi.registry.provider.flow.DatabaseFlowPersistenceProvider</class>
+ </flowPersistenceProvider>
+
+ <extensionBundlePersistenceProvider>
+ <class>org.apache.nifi.registry.provider.extension.FileSystemBundlePersistenceProvider</class>
+ <property name="Extension Bundle Storage Directory">./target/test-classes/extension_bundles</property>
+ </extensionBundlePersistenceProvider>
+
+</providers>
\ No newline at end of file
diff --git a/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java b/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java
index a917c19..a510281 100644
--- a/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java
+++ b/nifi-registry-toolkit/nifi-registry-toolkit-persistence/src/main/java/org/apache/nifi/registry/toolkit/persistence/FlowPersistenceProviderMigrator.java
@@ -40,6 +40,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.JdbcTemplate;
+import javax.sql.DataSource;
+
public class FlowPersistenceProviderMigrator {
private static final Logger log = LoggerFactory.getLogger(FlowPersistenceProviderMigrator.class);
public static final int PARSE_EXCEPTION = 1;
@@ -81,9 +83,10 @@ public class FlowPersistenceProviderMigrator {
NiFiRegistryProperties fromProperties = NiFiRegistry.initializeProperties(NiFiRegistry.getMasterKeyProvider());
- DatabaseMetadataService fromMetadataService = new DatabaseMetadataService(new JdbcTemplate(new DataSourceFactory(fromProperties).getDataSource()));
- FlowPersistenceProvider fromPersistenceProvider = createFlowPersistenceProvider(fromProperties);
- FlowPersistenceProvider toPersistenceProvider = createFlowPersistenceProvider(createToProperties(commandLine, fromProperties));
+ DataSource dataSource = new DataSourceFactory(fromProperties).getDataSource();
+ DatabaseMetadataService fromMetadataService = new DatabaseMetadataService(new JdbcTemplate(dataSource));
+ FlowPersistenceProvider fromPersistenceProvider = createFlowPersistenceProvider(fromProperties, dataSource);
+ FlowPersistenceProvider toPersistenceProvider = createFlowPersistenceProvider(createToProperties(commandLine, fromProperties), dataSource);
new FlowPersistenceProviderMigrator().doMigrate(fromMetadataService, fromPersistenceProvider, toPersistenceProvider);
}
@@ -97,10 +100,10 @@ public class FlowPersistenceProviderMigrator {
return toProperties;
}
- private static FlowPersistenceProvider createFlowPersistenceProvider(NiFiRegistryProperties niFiRegistryProperties) {
+ private static FlowPersistenceProvider createFlowPersistenceProvider(NiFiRegistryProperties niFiRegistryProperties, DataSource dataSource) {
ExtensionManager fromExtensionManager = new ExtensionManager(niFiRegistryProperties);
fromExtensionManager.discoverExtensions();
- StandardProviderFactory fromProviderFactory = new StandardProviderFactory(niFiRegistryProperties, fromExtensionManager);
+ StandardProviderFactory fromProviderFactory = new StandardProviderFactory(niFiRegistryProperties, fromExtensionManager, dataSource);
fromProviderFactory.initialize();
return fromProviderFactory.getFlowPersistenceProvider();
}