You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/09/21 13:50:40 UTC
[6/7] nifi-registry git commit: NIFIREG-18 Initial plumbling for H2
database - Setup Flyway with initial migration to define tables - Setup
entity classes with repositories - Setup unit testing for repositories -
Removed existing MetadataProvider concept
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
new file mode 100644
index 0000000..ddad946
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * A FlowPersistenceProvider that uses the local filesystem for storage.
+ */
+public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvider {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(FileSystemFlowPersistenceProvider.class);
+
+ static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
+
+ static final String SNAPSHOT_EXTENSION = ".snapshot";
+
+ private File flowStorageDir;
+
+ @Override
+ public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ final Map<String,String> props = configurationContext.getProperties();
+ if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) {
+ throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided");
+ }
+
+ final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP);
+ if (StringUtils.isBlank(flowStorageDirValue)) {
+ throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank");
+ }
+
+ try {
+ flowStorageDir = new File(flowStorageDirValue);
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir);
+ LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()});
+ } catch (IOException e) {
+ throw new ProviderCreationException(e);
+ }
+ }
+
+ @Override
+ public synchronized void saveSnapshot(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
+ final File bucketDir = new File(flowStorageDir, context.getBucketId());
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e);
+ }
+
+ final File flowDir = new File(bucketDir, context.getFlowId());
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing flow directory at " + flowDir.getAbsolutePath(), e);
+ }
+
+ final String versionString = String.valueOf(context.getVersion());
+ final File versionDir = new File(flowDir, versionString);
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e);
+ }
+
+ final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION);
+ if (versionFile.exists()) {
+ throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString);
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()});
+ }
+
+ try (final OutputStream out = new FileOutputStream(versionFile)) {
+ out.write(content);
+ out.flush();
+ } catch (Exception e) {
+ throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public synchronized byte[] getSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+ final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()});
+ }
+
+ if (!snapshotFile.exists()) {
+ return null;
+ }
+
+ try (final InputStream in = new FileInputStream(snapshotFile)){
+ return IOUtils.toByteArray(in);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error reading snapshot file: " + snapshotFile.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public synchronized void deleteSnapshots(final String bucketId, final String flowId) throws FlowPersistenceException {
+ final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId);
+ if (!flowDir.exists()) {
+ LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()});
+ return;
+ }
+
+ try {
+ org.apache.commons.io.FileUtils.cleanDirectory(flowDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error deleting snapshots at " + flowDir.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public synchronized void deleteSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+ final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+ if (!snapshotFile.exists()) {
+ LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()});
+ return;
+ }
+
+ final boolean deleted = snapshotFile.delete();
+ if (!deleted) {
+ throw new FlowPersistenceException("Unable to delete snapshot at " + snapshotFile.getAbsolutePath());
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()});
+ }
+ }
+
+ protected File getSnapshotFile(final String bucketId, final String flowId, final int version) {
+ final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION;
+ return new File(flowStorageDir, snapshotFilename);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
new file mode 100644
index 0000000..5b1f058
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+
+/**
+ * Standard implementation of FlowSnapshotContext.
+ */
+public class StandardFlowSnapshotContext implements FlowSnapshotContext {
+
+ private final String bucketId;
+ private final String bucketName;
+ private final String flowId;
+ private final String flowName;
+ private final int version;
+ private final String comments;
+ private final long snapshotTimestamp;
+
+ private StandardFlowSnapshotContext(final Builder builder) {
+ this.bucketId = builder.bucketId;
+ this.bucketName = builder.bucketName;
+ this.flowId = builder.flowId;
+ this.flowName = builder.flowName;
+ this.version = builder.version;
+ this.comments = builder.comments;
+ this.snapshotTimestamp = builder.snapshotTimestamp;
+
+ Validate.notBlank(bucketId);
+ Validate.notBlank(bucketName);
+ Validate.notBlank(flowId);
+ Validate.notBlank(flowName);
+ Validate.isTrue(version > 0);
+ Validate.isTrue(snapshotTimestamp > 0);
+ }
+
+ @Override
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ @Override
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ @Override
+ public String getFlowId() {
+ return flowId;
+ }
+
+ @Override
+ public String getFlowName() {
+ return flowName;
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public String getComments() {
+ return comments;
+ }
+
+ @Override
+ public long getSnapshotTimestamp() {
+ return snapshotTimestamp;
+ }
+
+ /**
+ * Builder for creating instances of StandardFlowSnapshotContext.
+ */
+ public static class Builder {
+
+ private String bucketId;
+ private String bucketName;
+ private String flowId;
+ private String flowName;
+ private int version;
+ private String comments;
+ private long snapshotTimestamp;
+
+ public Builder() {
+
+ }
+
+ public Builder(final Bucket bucket, final VersionedFlowSnapshotMetadata snapshotMetadata) {
+ bucketId(bucket.getIdentifier());
+ bucketName(bucket.getName());
+ flowId(snapshotMetadata.getFlowIdentifier());
+ flowName(snapshotMetadata.getFlowName());
+ version(snapshotMetadata.getVersion());
+ comments(snapshotMetadata.getComments());
+ snapshotTimestamp(snapshotMetadata.getTimestamp());
+ }
+
+ public Builder bucketId(final String bucketId) {
+ this.bucketId = bucketId;
+ return this;
+ }
+
+ public Builder bucketName(final String bucketName) {
+ this.bucketName = bucketName;
+ return this;
+ }
+
+ public Builder flowId(final String flowId) {
+ this.flowId = flowId;
+ return this;
+ }
+
+ public Builder flowName(final String flowName) {
+ this.flowName = flowName;
+ return this;
+ }
+
+ public Builder version(final int version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder comments(final String comments) {
+ this.comments = comments;
+ return this;
+ }
+
+ public Builder snapshotTimestamp(final long snapshotTimestamp) {
+ this.snapshotTimestamp = snapshotTimestamp;
+ return this;
+ }
+
+ public StandardFlowSnapshotContext build() {
+ return new StandardFlowSnapshotContext(this);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
index eff655d..515de10 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
@@ -40,7 +40,7 @@ public class JAXBSerializer<T> implements Serializer<T> {
try {
this.jaxbContext = JAXBContext.newInstance(clazz);
} catch (JAXBException e) {
- throw new RuntimeException("Unable to create JAXBContext.", e);
+ throw new RuntimeException("Unable to create JAXBContext: " + e.getMessage(), e);
}
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
index c80dc21..b4448a5 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
@@ -17,15 +17,16 @@
package org.apache.nifi.registry.service;
import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntityType;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntityKey;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.registry.metadata.BucketMetadata;
-import org.apache.nifi.registry.metadata.FlowMetadata;
-import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
-import org.apache.nifi.registry.metadata.StandardBucketMetadata;
-import org.apache.nifi.registry.metadata.StandardFlowMetadata;
-import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata;
+import java.util.Date;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.SortedSet;
@@ -36,90 +37,92 @@ import java.util.TreeSet;
*/
public class DataModelMapper {
- public static Bucket map(final BucketMetadata bucketMetadata) {
+ public static BucketEntity map(final Bucket bucket) {
+ final BucketEntity bucketEntity = new BucketEntity();
+ bucketEntity.setId(bucket.getIdentifier());
+ bucketEntity.setName(bucket.getName());
+ bucketEntity.setDescription(bucket.getDescription());
+ bucketEntity.setCreated(new Date(bucket.getCreatedTimestamp()));
+
+ // don't map items on the way in
+
+ return bucketEntity;
+ }
+
+ public static Bucket map(final BucketEntity bucketEntity, final boolean mapChildren) {
final Bucket bucket = new Bucket();
- bucket.setIdentifier(bucketMetadata.getIdentifier());
- bucket.setName(bucketMetadata.getName());
- bucket.setDescription(bucketMetadata.getDescription());
- bucket.setCreatedTimestamp(bucketMetadata.getCreatedTimestamp());
+ bucket.setIdentifier(bucketEntity.getId());
+ bucket.setName(bucketEntity.getName());
+ bucket.setDescription(bucketEntity.getDescription());
+ bucket.setCreatedTimestamp(bucketEntity.getCreated().getTime());
- if (bucketMetadata.getFlowMetadata() != null) {
+ if (mapChildren && bucketEntity.getItems() != null) {
final Set<VersionedFlow> flows = new LinkedHashSet<>();
- bucketMetadata.getFlowMetadata().stream().forEach(f -> flows.add(map(f)));
+ for (final BucketItemEntity itemEntity : bucketEntity.getItems()) {
+ if (BucketItemEntityType.FLOW == itemEntity.getType()) {
+ // we never return the snapshots when retrieving a bucket
+ flows.add(map((FlowEntity) itemEntity, false));
+ }
+ }
bucket.setVersionedFlows(flows);
}
return bucket;
}
- public static BucketMetadata map(final Bucket bucket) {
- final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder()
- .identifier(bucket.getIdentifier())
- .name(bucket.getName())
- .description(bucket.getDescription())
- .created(bucket.getCreatedTimestamp());
+ public static FlowEntity map(final VersionedFlow versionedFlow) {
+ final FlowEntity flowEntity = new FlowEntity();
+ flowEntity.setId(versionedFlow.getIdentifier());
+ flowEntity.setName(versionedFlow.getName());
+ flowEntity.setDescription(versionedFlow.getDescription());
+ flowEntity.setCreated(new Date(versionedFlow.getCreatedTimestamp()));
+ flowEntity.setModified(new Date(versionedFlow.getModifiedTimestamp()));
+ flowEntity.setType(BucketItemEntityType.FLOW);
- if (bucket.getVersionedFlows() != null) {
- bucket.getVersionedFlows().stream().forEach(f -> builder.addFlow(map(f)));
- }
+ // don't map snapshots on the way in
- return builder.build();
+ return flowEntity;
}
- public static VersionedFlow map(final FlowMetadata flowMetadata) {
+ public static VersionedFlow map(final FlowEntity flowEntity, boolean mapChildren) {
final VersionedFlow versionedFlow = new VersionedFlow();
- versionedFlow.setIdentifier(flowMetadata.getIdentifier());
- versionedFlow.setName(flowMetadata.getName());
- versionedFlow.setBucketIdentifier(flowMetadata.getBucketIdentifier());
- versionedFlow.setDescription(flowMetadata.getDescription());
- versionedFlow.setCreatedTimestamp(flowMetadata.getCreatedTimestamp());
- versionedFlow.setModifiedTimestamp(flowMetadata.getModifiedTimestamp());
-
- if (flowMetadata.getSnapshotMetadata() != null) {
+ versionedFlow.setIdentifier(flowEntity.getId());
+ versionedFlow.setBucketIdentifier(flowEntity.getBucket().getId());
+ versionedFlow.setName(flowEntity.getName());
+ versionedFlow.setDescription(flowEntity.getDescription());
+ versionedFlow.setCreatedTimestamp(flowEntity.getCreated().getTime());
+ versionedFlow.setModifiedTimestamp(flowEntity.getModified().getTime());
+
+ if (mapChildren && flowEntity.getSnapshots() != null) {
final SortedSet<VersionedFlowSnapshotMetadata> snapshots = new TreeSet<>();
- flowMetadata.getSnapshotMetadata().stream().forEach(s -> snapshots.add(map(s)));
+ flowEntity.getSnapshots().stream().forEach(s -> snapshots.add(map(s)));
versionedFlow.setSnapshotMetadata(snapshots);
}
return versionedFlow;
}
- public static FlowMetadata map(final VersionedFlow versionedFlow) {
- final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder()
- .identifier(versionedFlow.getIdentifier())
- .name(versionedFlow.getName())
- .bucketIdentifier(versionedFlow.getBucketIdentifier())
- .description(versionedFlow.getDescription())
- .created(versionedFlow.getCreatedTimestamp())
- .modified(versionedFlow.getModifiedTimestamp());
-
- if (versionedFlow.getSnapshotMetadata() != null) {
- versionedFlow.getSnapshotMetadata().stream().forEach(s -> builder.addSnapshot(map(s)));
- }
+ public static FlowSnapshotEntity map(final VersionedFlowSnapshotMetadata versionedFlowSnapshot) {
+ final FlowSnapshotEntityKey key = new FlowSnapshotEntityKey();
+ key.setFlowId(versionedFlowSnapshot.getFlowIdentifier());
+ key.setVersion(versionedFlowSnapshot.getVersion());
- return builder.build();
+ final FlowSnapshotEntity flowSnapshotEntity = new FlowSnapshotEntity();
+ flowSnapshotEntity.setId(key);
+ flowSnapshotEntity.setComments(versionedFlowSnapshot.getComments());
+ flowSnapshotEntity.setCreated(new Date(versionedFlowSnapshot.getTimestamp()));
+ return flowSnapshotEntity;
}
- public static VersionedFlowSnapshotMetadata map(final FlowSnapshotMetadata flowSnapshotMetadata) {
+ public static VersionedFlowSnapshotMetadata map(final FlowSnapshotEntity flowSnapshotEntity) {
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
- metadata.setBucketIdentifier(flowSnapshotMetadata.getBucketIdentifier());
- metadata.setFlowIdentifier(flowSnapshotMetadata.getFlowIdentifier());
- metadata.setFlowName(flowSnapshotMetadata.getFlowName());
- metadata.setComments(flowSnapshotMetadata.getComments());
- metadata.setTimestamp(flowSnapshotMetadata.getCreatedTimestamp());
- metadata.setVersion(flowSnapshotMetadata.getVersion());
+ metadata.setFlowIdentifier(flowSnapshotEntity.getId().getFlowId());
+ metadata.setVersion(flowSnapshotEntity.getId().getVersion());
+ metadata.setBucketIdentifier(flowSnapshotEntity.getFlow().getBucket().getId());
+ metadata.setFlowName(flowSnapshotEntity.getFlow().getName());
+ metadata.setComments(flowSnapshotEntity.getComments());
+ metadata.setTimestamp(flowSnapshotEntity.getCreated().getTime());
return metadata;
}
- public static FlowSnapshotMetadata map(final VersionedFlowSnapshotMetadata metadata) {
- return new StandardFlowSnapshotMetadata.Builder()
- .bucketIdentifier(metadata.getBucketIdentifier())
- .flowIdentifier(metadata.getFlowIdentifier())
- .flowName(metadata.getFlowName())
- .comments(metadata.getComments())
- .created(metadata.getTimestamp())
- .version(metadata.getVersion())
- .build();
- }
-
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
new file mode 100644
index 0000000..3c17cee
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+import org.apache.nifi.registry.service.params.QueryParameters;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A service for managing metadata about all objects stored by the registry.
+ *
+ */
+public interface MetadataService {
+
+ /**
+ * Creates the given bucket.
+ *
+ * @param bucket the bucket to create
+ * @return the created bucket
+ */
+ BucketEntity createBucket(BucketEntity bucket);
+
+ /**
+ * Retrieves the bucket with the given id.
+ *
+ * @param bucketIdentifier the id of the bucket to retrieve
+ * @return the bucket with the given id, or null if it does not exist
+ */
+ BucketEntity getBucketById(String bucketIdentifier);
+
+ /**
+ * Retrieves the buckets with the given name. The name comparison must be case-insensitive.
+ *
+ * @param name the name of the bucket to retrieve
+ * @return the buckets with the given name, or empty list if none exist
+ */
+ List<BucketEntity> getBucketsByName(String name);
+
+ /**
+ * Updates the given bucket, only the name and description should be allowed to be updated.
+ *
+ * @param bucket the updated bucket to save
+ * @return the updated bucket, or null if no bucket with the given id exists
+ */
+ BucketEntity updateBucket(BucketEntity bucket);
+
+ /**
+ * Deletes the bucket, as well as any objects that reference the bucket.
+ *
+ * @param bucket the bucket to delete
+ */
+ void deleteBucket(BucketEntity bucket);
+
+ /**
+ * Retrieves all buckets known to this metadata provider.
+ *
+ * @param params the paging and sorting params, or null
+ * @return the set of all buckets
+ */
+ List<BucketEntity> getBuckets(QueryParameters params);
+
+ /**
+ * Retrieves items across all buckets.
+ *
+ * @param queryParameters the parameters for retrieving the items, or null
+ * @return the set of all items
+ */
+ List<BucketItemEntity> getBucketItems(QueryParameters queryParameters);
+
+ /**
+ * Retrieves items for the given bucket.
+ *
+ * @param bucket the bucket to retrieve items for
+ * @param queryParameters the parameters for retrieving the items, or null
+ * @return the set of items for the bucket
+ */
+ List<BucketItemEntity> getBucketItems(QueryParameters queryParameters, BucketEntity bucket);
+
+ /**
+ * Creates a versioned flow in the given bucket.
+ *
+ * @param flow the versioned flow to create
+ * @return the created versioned flow
+ * @throws IllegalStateException if no bucket with the given identifier exists
+ */
+ FlowEntity createFlow(FlowEntity flow);
+
+ /**
+ * Retrieves the versioned flow with the given id.
+ *
+ * @param flowIdentifier the identifier of the flow to retrieve
+ * @return the versioned flow with the given id, or null if no flow with the given id exists
+ */
+ FlowEntity getFlowById(String flowIdentifier);
+
+ /**
+ * Retrieves the versioned flows with the given name. The name comparison must be case-insensitive.
+ *
+ * @param name the name of the flow to retrieve
+ * @return the versioned flows with the given name, or empty list if no flows with the given name exists
+ */
+ List<FlowEntity> getFlowsByName(String name);
+
+ /**
+ * Updates the given versioned flow, only the name and description should be allowed to be updated.
+ *
+ * @param flow the updated versioned flow to save
+ * @return the updated versioned flow
+ */
+ FlowEntity updateFlow(FlowEntity flow);
+
+ /**
+ * Deletes the flow if one exists.
+ *
+ * @param flow the flow to delete
+ */
+ void deleteFlow(FlowEntity flow);
+
+ /**
+ * Retrieves all versioned flows known to this metadata provider.
+ *
+ * @param queryParameters the paging and sorting params, or null
+ * @return the set of all versioned flows
+ */
+ List<FlowEntity> getFlows(QueryParameters queryParameters);
+
+ /**
+ * Creates a versioned flow snapshot.
+ *
+ * @param flowSnapshot the snapshot to create
+ * @return the created snapshot
+ * @throws IllegalStateException if the versioned flow specified by flowSnapshot.getFlowIdentifier() does not exist
+ */
+ FlowSnapshotEntity createFlowSnapshot(FlowSnapshotEntity flowSnapshot);
+
+ /**
+ * Retrieves the snapshot for the given flow identifier and snapshot version.
+ *
+ * @param flowIdentifier the identifier of the flow the snapshot belongs to
+ * @param version the version of the snapshot
+ * @return the versioned flow snapshot for the given flow identifier and version, or null if none exists
+ */
+ FlowSnapshotEntity getFlowSnapshot(String flowIdentifier, Integer version);
+
+ /**
+ * Deletes the flow snapshot.
+ *
+ * @param flowSnapshot the flow snapshot to delete
+ */
+ void deleteFlowSnapshot(FlowSnapshotEntity flowSnapshot);
+
+ /**
+ * @return the set of field names for Buckets
+ */
+ Set<String> getBucketFields();
+
+ /**
+ * @return the set of field names for BucketItems
+ */
+ Set<String> getBucketItemFields();
+
+ /**
+ * @return the set of field names for Flows
+ */
+ Set<String> getFlowFields();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
index 6ca999c..fc067e6 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
@@ -17,23 +17,27 @@
package org.apache.nifi.registry.service;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.bucket.BucketItem;
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
import org.apache.nifi.registry.exception.ResourceNotFoundException;
import org.apache.nifi.registry.flow.FlowPersistenceProvider;
import org.apache.nifi.registry.flow.FlowSnapshotContext;
-import org.apache.nifi.registry.flow.StandardFlowSnapshotContext;
import org.apache.nifi.registry.flow.VersionedFlow;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.registry.metadata.BucketMetadata;
-import org.apache.nifi.registry.metadata.FlowMetadata;
-import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
-import org.apache.nifi.registry.metadata.MetadataProvider;
-import org.apache.nifi.registry.metadata.StandardBucketMetadata;
-import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
import org.apache.nifi.registry.serialization.Serializer;
+import org.apache.nifi.registry.service.params.QueryParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
@@ -41,17 +45,29 @@ import javax.validation.Validator;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
-import java.util.Objects;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+/**
+ * Main service for all back-end operations, REST resources should only interact with this service.
+ *
+ * This service is marked as @Transactional so that Spring will automatically start a transaction upon entering
+ * any method, and will rollback the transaction if any Exception is thrown out of a method.
+ *
+ */
@Service
+@Transactional(rollbackFor = Exception.class)
public class RegistryService {
- private final MetadataProvider metadataProvider;
+ private static final Logger LOGGER = LoggerFactory.getLogger(RegistryService.class);
+
+ private final MetadataService metadataService;
private final FlowPersistenceProvider flowPersistenceProvider;
private final Serializer<VersionedFlowSnapshot> snapshotSerializer;
private final Validator validator;
@@ -60,18 +76,19 @@ public class RegistryService {
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
- public RegistryService(@Autowired final MetadataProvider metadataProvider,
- @Autowired final FlowPersistenceProvider flowPersistenceProvider,
- @Autowired final Serializer<VersionedFlowSnapshot> snapshotSerializer,
- @Autowired final Validator validator) {
- this.metadataProvider = metadataProvider;
+ @Autowired
+ public RegistryService(final MetadataService metadataService,
+ final FlowPersistenceProvider flowPersistenceProvider,
+ final Serializer<VersionedFlowSnapshot> snapshotSerializer,
+ final Validator validator) {
+ this.metadataService = metadataService;
this.flowPersistenceProvider = flowPersistenceProvider;
this.snapshotSerializer = snapshotSerializer;
this.validator = validator;
- Objects.requireNonNull(this.metadataProvider);
- Objects.requireNonNull(this.flowPersistenceProvider);
- Objects.requireNonNull(this.snapshotSerializer);
- Objects.requireNonNull(this.validator);
+ Validate.notNull(this.metadataService);
+ Validate.notNull(this.flowPersistenceProvider);
+ Validate.notNull(this.snapshotSerializer);
+ Validate.notNull(this.validator);
}
private <T> void validate(T t, String invalidMessage) {
@@ -97,41 +114,41 @@ public class RegistryService {
writeLock.lock();
try {
- final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName());
- if (existingBucketWithSameName != null) {
- throw new IllegalStateException("A bucket with the same name already exists: " + existingBucketWithSameName.getIdentifier());
+ final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName());
+ if (bucketsWithSameName.size() > 0) {
+ throw new IllegalStateException("A bucket with the same name already exists");
}
- final BucketMetadata createdBucket = metadataProvider.createBucket(DataModelMapper.map(bucket));
- return DataModelMapper.map(createdBucket);
+ final BucketEntity createdBucket = metadataService.createBucket(DataModelMapper.map(bucket));
+ return DataModelMapper.map(createdBucket, false);
} finally {
writeLock.unlock();
}
}
- public Bucket getBucket(final String bucketIdentifier) {
+ public Bucket getBucket(final String bucketIdentifier, final boolean verbose) {
if (bucketIdentifier == null) {
throw new IllegalArgumentException("Bucket Identifier cannot be null");
}
readLock.lock();
try {
- final BucketMetadata bucket = metadataProvider.getBucketById(bucketIdentifier);
+ final BucketEntity bucket = metadataService.getBucketById(bucketIdentifier);
if (bucket == null) {
throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
}
- return DataModelMapper.map(bucket);
+ return DataModelMapper.map(bucket, verbose);
} finally {
readLock.unlock();
}
}
- public Set<Bucket> getBuckets() {
+ public List<Bucket> getBuckets(final QueryParameters queryParameters) {
readLock.lock();
try {
- final Set<BucketMetadata> buckets = metadataProvider.getBuckets();
- return buckets.stream().map(b -> DataModelMapper.map(b)).collect(Collectors.toSet());
+ final List<BucketEntity> buckets = metadataService.getBuckets(queryParameters);
+ return buckets.stream().map(b -> DataModelMapper.map(b, false)).collect(Collectors.toList());
} finally {
readLock.unlock();
}
@@ -149,7 +166,7 @@ public class RegistryService {
writeLock.lock();
try {
// ensure a bucket with the given id exists
- final BucketMetadata existingBucketById = metadataProvider.getBucketById(bucket.getIdentifier());
+ final BucketEntity existingBucketById = metadataService.getBucketById(bucket.getIdentifier());
if (existingBucketById == null) {
throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucket.getIdentifier());
}
@@ -157,26 +174,28 @@ public class RegistryService {
// ensure a different bucket with the same name does not exist
// since we're allowing partial updates here, only check this if a non-null name is provided
if (StringUtils.isNotBlank(bucket.getName())) {
- final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName());
- if (existingBucketWithSameName != null && !existingBucketWithSameName.getIdentifier().equals(existingBucketById.getIdentifier())) {
- throw new IllegalStateException("A bucket with the same name already exists: " + bucket.getName());
+ final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName());
+ if (bucketsWithSameName != null) {
+ for (final BucketEntity bucketWithSameName : bucketsWithSameName) {
+ if (!bucketWithSameName.getId().equals(existingBucketById.getId())){
+ throw new IllegalStateException("A bucket with the same name already exists: " + bucket.getName());
+ }
+ }
}
}
- final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder(existingBucketById);
-
// transfer over the new values to the existing bucket
if (StringUtils.isNotBlank(bucket.getName())) {
- builder.name(bucket.getName());
+ existingBucketById.setName(bucket.getName());
}
if (bucket.getDescription() != null) {
- builder.description(bucket.getDescription());
+ existingBucketById.setDescription(bucket.getDescription());
}
// perform the actual update
- final BucketMetadata updatedBucket = metadataProvider.updateBucket(builder.build());
- return DataModelMapper.map(updatedBucket);
+ final BucketEntity updatedBucket = metadataService.updateBucket(existingBucketById);
+ return DataModelMapper.map(updatedBucket, false);
} finally {
writeLock.unlock();
}
@@ -190,28 +209,67 @@ public class RegistryService {
writeLock.lock();
try {
// ensure the bucket exists
- final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier);
+ final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
if (existingBucket == null) {
throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
}
- // retrieve the versioned flows that are in this bucket
- final Set<FlowMetadata> bucketFlows = metadataProvider.getFlows(bucketIdentifier);
-
// for each flow in the bucket, delete all snapshots from the flow persistence provider
- for (final FlowMetadata bucketFlow : bucketFlows) {
- flowPersistenceProvider.deleteSnapshots(bucketIdentifier, bucketFlow.getIdentifier());
+ for (final FlowEntity flowEntity : existingBucket.getFlows()) {
+ flowPersistenceProvider.deleteSnapshots(bucketIdentifier, flowEntity.getId());
}
// now delete the bucket from the metadata provider, which deletes all flows referencing it
- metadataProvider.deleteBucket(bucketIdentifier);
+ metadataService.deleteBucket(existingBucket);
- return DataModelMapper.map(existingBucket);
+ return DataModelMapper.map(existingBucket, false);
} finally {
writeLock.unlock();
}
}
+ // ---------------------- BucketItem methods ---------------------------------------------
+
+ public List<BucketItem> getBucketItems(final QueryParameters queryParameters) {
+ readLock.lock();
+ try {
+ final List<BucketItem> bucketItems = new ArrayList<>();
+ metadataService.getBucketItems(queryParameters).stream().forEach(b -> addBucketItem(bucketItems, b));
+ return bucketItems;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public List<BucketItem> getBucketItems(final QueryParameters queryParameters, final String bucketIdentifier) {
+ if (bucketIdentifier == null) {
+ throw new IllegalArgumentException("Bucket Identifier cannot be null");
+ }
+
+ readLock.lock();
+ try {
+ final BucketEntity bucket = metadataService.getBucketById(bucketIdentifier);
+ if (bucket == null) {
+ throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
+ }
+
+ final List<BucketItem> bucketItems = new ArrayList<>();
+ metadataService.getBucketItems(queryParameters, bucket).stream().forEach(b -> addBucketItem(bucketItems, b));
+ return bucketItems;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void addBucketItem(final List<BucketItem> bucketItems, final BucketItemEntity itemEntity) {
+ if (itemEntity instanceof FlowEntity) {
+ final FlowEntity flowEntity = (FlowEntity) itemEntity;
+ bucketItems.add(DataModelMapper.map(flowEntity, false));
+ } else {
+ LOGGER.error("Unknown type of BucketItemEntity: " + itemEntity.getClass().getCanonicalName());
+ }
+ }
+
// ---------------------- VersionedFlow methods ---------------------------------------------
public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) {
@@ -245,66 +303,73 @@ public class RegistryService {
writeLock.lock();
try {
// ensure the bucket exists
- final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier);
+ final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
if (existingBucket == null) {
throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
}
- final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName());
- if (existingFlowWithSameName != null) {
- throw new IllegalStateException("A VersionedFlow with the same name already exists: " + existingFlowWithSameName.getIdentifier());
+ // ensure another flow with the same name doesn't exist
+ final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(versionedFlow.getName());
+ if (flowsWithSameName != null && flowsWithSameName.size() > 0) {
+ throw new IllegalStateException("A VersionedFlow with the same name already exists");
}
- // create the flow
- final FlowMetadata createdFlow = metadataProvider.createFlow(bucketIdentifier, DataModelMapper.map(versionedFlow));
- return DataModelMapper.map(createdFlow);
+ // convert from dto to entity and set the bucket relationship
+ final FlowEntity flowEntity = DataModelMapper.map(versionedFlow);
+ flowEntity.setBucket(existingBucket);
+
+ // persist the flow and return the created entity
+ final FlowEntity createdFlow = metadataService.createFlow(flowEntity);
+ return DataModelMapper.map(createdFlow, false);
} finally {
writeLock.unlock();
}
}
- public VersionedFlow getFlow(final String flowIdentifier) {
+ public VersionedFlow getFlow(final String flowIdentifier, final boolean verbose) {
if (StringUtils.isBlank(flowIdentifier)) {
throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
}
readLock.lock();
try {
- final FlowMetadata flowMetadata = metadataProvider.getFlowById(flowIdentifier);
- if (flowMetadata == null) {
+ final FlowEntity flowEntity = metadataService.getFlowById(flowIdentifier);
+ if (flowEntity == null) {
throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier);
}
- return DataModelMapper.map(flowMetadata);
+ return DataModelMapper.map(flowEntity, verbose);
} finally {
readLock.unlock();
}
}
- public Set<VersionedFlow> getFlows() {
+ public List<VersionedFlow> getFlows(final QueryParameters queryParameters) {
readLock.lock();
try {
- final Set<FlowMetadata> flows = metadataProvider.getFlows();
- return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet());
+ // return non-verbose set of all flows
+ final List<FlowEntity> flows = metadataService.getFlows(queryParameters);
+ return flows.stream().map(f -> DataModelMapper.map(f, false)).collect(Collectors.toList());
} finally {
readLock.unlock();
}
}
- public Set<VersionedFlow> getFlows(String bucketId) {
+ public List<VersionedFlow> getFlows(final String bucketId) {
if (StringUtils.isBlank(bucketId)) {
throw new IllegalArgumentException("Bucket Identifier cannot be null");
}
readLock.lock();
try {
- final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketId);
+ final BucketEntity existingBucket = metadataService.getBucketById(bucketId);
if (existingBucket == null) {
throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketId);
}
- final Set<FlowMetadata> flows = metadataProvider.getFlows(bucketId);
- return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet());
+ // return non-verbose set of flows for the given bucket
+ final Set<FlowEntity> flows = existingBucket.getFlows();
+ return flows.stream().map(f -> DataModelMapper.map(f, false)).collect(Collectors.toList());
} finally {
readLock.unlock();
}
@@ -322,7 +387,7 @@ public class RegistryService {
writeLock.lock();
try {
// ensure a flow with the given id exists
- final FlowMetadata existingFlow = metadataProvider.getFlowById(versionedFlow.getIdentifier());
+ final FlowEntity existingFlow = metadataService.getFlowById(versionedFlow.getIdentifier());
if (existingFlow == null) {
throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + versionedFlow.getIdentifier());
}
@@ -330,28 +395,28 @@ public class RegistryService {
// ensure a different flow with the same name does not exist
// since we're allowing partial updates here, only check this if a non-null name is provided
if (StringUtils.isNotBlank(versionedFlow.getName())) {
- final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName());
- if (existingFlowWithSameName != null && !existingFlowWithSameName.getIdentifier().equals(existingFlow.getIdentifier())) {
- throw new IllegalStateException("A VersionedFlow with the same name already exists: " + versionedFlow.getName());
+ final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(versionedFlow.getName());
+ if (flowsWithSameName != null) {
+ for (final FlowEntity flowWithSameName : flowsWithSameName) {
+ if(!flowWithSameName.getId().equals(existingFlow.getId())) {
+ throw new IllegalStateException("A VersionedFlow with the same name already exists: " + versionedFlow.getName());
+ }
+ }
}
}
- final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder(existingFlow);
-
// transfer over the new values to the existing flow
if (StringUtils.isNotBlank(versionedFlow.getName())) {
- builder.name(versionedFlow.getName());
+ existingFlow.setName(versionedFlow.getName());
}
if (versionedFlow.getDescription() != null) {
- builder.description(versionedFlow.getDescription());
+ existingFlow.setDescription(versionedFlow.getDescription());
}
- builder.modified(System.currentTimeMillis());
-
// perform the actual update
- final FlowMetadata updatedFlow = metadataProvider.updateFlow(builder.build());
- return DataModelMapper.map(updatedFlow);
+ final FlowEntity updatedFlow = metadataService.updateFlow(existingFlow);
+ return DataModelMapper.map(updatedFlow, false);
} finally {
writeLock.unlock();
}
@@ -365,18 +430,18 @@ public class RegistryService {
writeLock.lock();
try {
// ensure the flow exists
- final FlowMetadata existingFlow = metadataProvider.getFlowById(flowIdentifier);
+ final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier);
if (existingFlow == null) {
throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier);
}
// delete all snapshots from the flow persistence provider
- flowPersistenceProvider.deleteSnapshots(existingFlow.getBucketIdentifier(), existingFlow.getIdentifier());
+ flowPersistenceProvider.deleteSnapshots(existingFlow.getBucket().getId(), existingFlow.getId());
// now delete the flow from the metadata provider
- metadataProvider.deleteFlow(flowIdentifier);
+ metadataService.deleteFlow(existingFlow);
- return DataModelMapper.map(existingFlow);
+ return DataModelMapper.map(existingFlow, false);
} finally {
writeLock.unlock();
}
@@ -401,22 +466,22 @@ public class RegistryService {
final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
// ensure the bucket exists
- final BucketMetadata existingBucket = metadataProvider.getBucketById(snapshotMetadata.getBucketIdentifier());
+ final BucketEntity existingBucket = metadataService.getBucketById(snapshotMetadata.getBucketIdentifier());
if (existingBucket == null) {
throw new ResourceNotFoundException("Bucket does not exist for identifier: " + snapshotMetadata.getBucketIdentifier());
}
// ensure the flow exists
- final FlowMetadata existingFlowMetadata = metadataProvider.getFlowById(snapshotMetadata.getFlowIdentifier());
- if (existingFlowMetadata == null) {
+ final FlowEntity existingFlow = metadataService.getFlowById(snapshotMetadata.getFlowIdentifier());
+ if (existingFlow == null) {
throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + snapshotMetadata.getFlowIdentifier());
}
- final VersionedFlow existingFlow = DataModelMapper.map(existingFlowMetadata);
+ final VersionedFlow versionedFlow = DataModelMapper.map(existingFlow, true);
// if we already have snapshots we need to verify the new one has the correct version
- if (existingFlow.getSnapshotMetadata() != null && existingFlow.getSnapshotMetadata().size() > 0) {
- final VersionedFlowSnapshotMetadata lastSnapshot = existingFlow.getSnapshotMetadata().last();
+ if (versionedFlow.getSnapshotMetadata() != null && versionedFlow.getSnapshotMetadata().size() > 0) {
+ final VersionedFlowSnapshotMetadata lastSnapshot = versionedFlow.getSnapshotMetadata().last();
if (snapshotMetadata.getVersion() <= lastSnapshot.getVersion()) {
throw new IllegalStateException("A VersionedFlowSnapshot with the same version already exists: " + snapshotMetadata.getVersion());
@@ -436,12 +501,17 @@ public class RegistryService {
snapshotSerializer.serialize(flowSnapshot, out);
// save the serialized snapshot to the persistence provider
- final Bucket bucket = DataModelMapper.map(existingBucket);
+ final Bucket bucket = DataModelMapper.map(existingBucket, false);
final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder(bucket, snapshotMetadata).build();
flowPersistenceProvider.saveSnapshot(context, out.toByteArray());
// create snapshot in the metadata provider
- metadataProvider.createFlowSnapshot(DataModelMapper.map(snapshotMetadata));
+ metadataService.createFlowSnapshot(DataModelMapper.map(snapshotMetadata));
+
+ // update the modified date on the flow
+ existingFlow.setModified(new Date());
+ metadataService.updateFlow(existingFlow);
+
return flowSnapshot;
} finally {
writeLock.unlock();
@@ -460,17 +530,17 @@ public class RegistryService {
readLock.lock();
try {
// ensure the snapshot exists
- final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version);
- if (snapshotMetadata == null) {
+ final FlowSnapshotEntity snapshotEntity = metadataService.getFlowSnapshot(flowIdentifier, version);
+ if (snapshotEntity == null) {
throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow " + flowIdentifier + " and version " + version);
}
+ final FlowEntity flow = snapshotEntity.getFlow();
+ final String flowId = flow.getId();
+ final String bucketId = flow.getBucket().getId();
+
// get the serialized bytes of the snapshot
- final byte[] serializedSnapshot = flowPersistenceProvider.getSnapshot(
- snapshotMetadata.getBucketIdentifier(),
- snapshotMetadata.getFlowIdentifier(),
- snapshotMetadata.getVersion()
- );
+ final byte[] serializedSnapshot = flowPersistenceProvider.getSnapshot(bucketId, flowId, version);
if (serializedSnapshot == null || serializedSnapshot.length == 0) {
throw new IllegalStateException("No serialized content found for snapshot with flow identifier "
@@ -496,24 +566,39 @@ public class RegistryService {
writeLock.lock();
try {
// ensure the snapshot exists
- final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version);
- if (snapshotMetadata == null) {
+ final FlowSnapshotEntity snapshotEntity = metadataService.getFlowSnapshot(flowIdentifier, version);
+ if (snapshotEntity == null) {
throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow "
+ flowIdentifier + " and version " + version);
}
+ final FlowEntity flow = snapshotEntity.getFlow();
+ final String flowId = flow.getId();
+ final String bucketId = flow.getBucket().getId();
+
// delete the content of the snapshot
- flowPersistenceProvider.deleteSnapshot(
- snapshotMetadata.getBucketIdentifier(),
- snapshotMetadata.getFlowIdentifier(),
- snapshotMetadata.getVersion());
+ flowPersistenceProvider.deleteSnapshot(bucketId, flowId, version);
// delete the snapshot itself
- metadataProvider.deleteFlowSnapshot(flowIdentifier, version);
- return DataModelMapper.map(snapshotMetadata);
+ metadataService.deleteFlowSnapshot(snapshotEntity);
+ return DataModelMapper.map(snapshotEntity);
} finally {
writeLock.unlock();
}
}
+ // ---------------------- Field methods ---------------------------------------------
+
+ public Set<String> getBucketFields() {
+ return metadataService.getBucketFields();
+ }
+
+ public Set<String> getBucketItemFields() {
+ return metadataService.getBucketItemFields();
+ }
+
+ public Set<String> getFlowFields() {
+ return metadataService.getFlowFields();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/TestService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/TestService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/TestService.java
deleted file mode 100644
index 3160fc1..0000000
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/TestService.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.registry.service;
-
-import org.springframework.stereotype.Service;
-
-/**
- * Test service to verify spring-boot will correctly inject into JAX-RS resource TestResource.
- */
-@Service
-public class TestService {
-
- public String test() {
- return "Testing";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/QueryParameters.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/QueryParameters.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/QueryParameters.java
new file mode 100644
index 0000000..3c62a9d
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/QueryParameters.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service.params;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Parameters to be passed into service layer for methods that require sorting and paging.
+ */
+public class QueryParameters {
+
+ private final Integer pageNum;
+
+ private final Integer numRows;
+
+ private final List<SortParameter> sortParameters;
+
+ private QueryParameters(final Builder builder) {
+ this.pageNum = builder.pageNum;
+ this.numRows = builder.numRows;
+ this.sortParameters = Collections.unmodifiableList(new ArrayList<>(builder.sortParameters));
+
+ if (this.pageNum != null && this.numRows != null) {
+ if (this.pageNum < 0) {
+ throw new IllegalStateException("Offset cannot be negative");
+ }
+
+ if (this.numRows < 0) {
+ throw new IllegalStateException("Number of rows cannot be negative");
+ }
+ }
+ }
+
+ public Integer getPageNum() {
+ return pageNum;
+ }
+
+ public Integer getNumRows() {
+ return numRows;
+ }
+
+ public List<SortParameter> getSortParameters() {
+ return sortParameters;
+ }
+
+ /**
+ * Builder for QueryParameters.
+ */
+ public static class Builder {
+
+ private Integer pageNum;
+ private Integer numRows;
+ private List<SortParameter> sortParameters = new ArrayList<>();
+
+ public Builder pageNum(Integer pageNum) {
+ this.pageNum = pageNum;
+ return this;
+ }
+
+ public Builder numRows(Integer numRows) {
+ this.numRows = numRows;
+ return this;
+ }
+
+ public Builder addSort(final SortParameter sort) {
+ this.sortParameters.add(sort);
+ return this;
+ }
+
+ public Builder addSort(final String fieldName, final SortOrder order) {
+ this.sortParameters.add(new SortParameter(fieldName, order));
+ return this;
+ }
+
+ public Builder addSorts(final Collection<SortParameter> sorts) {
+ if (sorts != null) {
+ this.sortParameters.addAll(sorts);
+ }
+ return this;
+ }
+
+ public Builder clearSorts() {
+ this.sortParameters.clear();
+ return this;
+ }
+
+ public QueryParameters build() {
+ return new QueryParameters(this);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortOrder.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortOrder.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortOrder.java
new file mode 100644
index 0000000..43a3016
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortOrder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service.params;
+
+public enum SortOrder {
+
+ ASC("asc"),
+
+ DESC("desc");
+
+ private final String name;
+
+ SortOrder(final String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public static SortOrder fromString(String order) {
+ if (ASC.getName().equals(order)) {
+ return ASC;
+ }
+
+ if (DESC.getName().equals(order)) {
+ return DESC;
+ }
+
+ throw new IllegalArgumentException("Unknown Sort Order: " + order);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortParameter.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortParameter.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortParameter.java
new file mode 100644
index 0000000..327bd95
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortParameter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service.params;
+
+/**
+ * Sort parameter made up of a field and a sort order.
+ */
+public class SortParameter {
+
+ private final String fieldName;
+
+ private final SortOrder order;
+
+ public SortParameter(final String fieldName, final SortOrder order) {
+ this.fieldName = fieldName;
+ this.order = order;
+
+ if (this.fieldName == null) {
+ throw new IllegalStateException("Field Name cannot be null");
+ }
+
+ if (this.fieldName.trim().isEmpty()) {
+ throw new IllegalStateException("Field Name cannot be blank");
+ }
+
+ if (this.order == null) {
+ throw new IllegalStateException("Order cannot be null");
+ }
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public SortOrder getOrder() {
+ return order;
+ }
+
+ /**
+ * Parses a sorting expression of the form field:order.
+ *
+ * @param sortExpression the expression
+ * @return the Sort instance
+ */
+ public static SortParameter fromString(final String sortExpression) {
+ if (sortExpression == null) {
+ throw new IllegalArgumentException("Sort cannot be null");
+ }
+
+ final String[] sortParts = sortExpression.split("[:]");
+ if (sortParts.length != 2) {
+ throw new IllegalArgumentException("Sort must be in the form field:order");
+ }
+
+ final String fieldName = sortParts[0];
+ final SortOrder order = SortOrder.fromString(sortParts[1]);
+
+ return new SortParameter(fieldName, order);
+ }
+
+ @Override
+ public String toString() {
+ return fieldName + ":" + order.getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/resources/db/migration/V1__Initial.sql
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/resources/db/migration/V1__Initial.sql b/nifi-registry-framework/src/main/resources/db/migration/V1__Initial.sql
new file mode 100644
index 0000000..f7640ed
--- /dev/null
+++ b/nifi-registry-framework/src/main/resources/db/migration/V1__Initial.sql
@@ -0,0 +1,46 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE BUCKET (
+ ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+ NAME VARCHAR2(200) NOT NULL UNIQUE,
+ DESCRIPTION VARCHAR(4096),
+ CREATED TIMESTAMP NOT NULL
+);
+
+CREATE TABLE BUCKET_ITEM (
+ ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+ NAME VARCHAR2(200) NOT NULL UNIQUE,
+ DESCRIPTION VARCHAR(4096),
+ CREATED TIMESTAMP NOT NULL,
+ MODIFIED TIMESTAMP NOT NULL,
+ ITEM_TYPE VARCHAR(50) NOT NULL,
+ BUCKET_ID VARCHAR2(50) NOT NULL,
+ FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID)
+);
+
+CREATE TABLE FLOW (
+ ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+ FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID)
+);
+
+CREATE TABLE FLOW_SNAPSHOT (
+ FLOW_ID VARCHAR2(50) NOT NULL,
+ VERSION INT NOT NULL,
+ CREATED TIMESTAMP NOT NULL,
+ COMMENTS VARCHAR(4096),
+ PRIMARY KEY (FLOW_ID, VERSION),
+ FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID)
+);
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/xsd/providers.xsd
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/xsd/providers.xsd b/nifi-registry-framework/src/main/xsd/providers.xsd
index cb71ed8..1202f9e 100644
--- a/nifi-registry-framework/src/main/xsd/providers.xsd
+++ b/nifi-registry-framework/src/main/xsd/providers.xsd
@@ -42,7 +42,6 @@
<xs:element name="providers">
<xs:complexType>
<xs:sequence>
- <xs:element name="metadataProvider" type="Provider" minOccurs="1" maxOccurs="1"/>
<xs:element name="flowPersistenceProvider" type="Provider" minOccurs="1" maxOccurs="1" />
</xs:sequence>
</xs:complexType>
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/RepositoryTestApplication.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/RepositoryTestApplication.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/RepositoryTestApplication.java
new file mode 100644
index 0000000..de8cd79
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/RepositoryTestApplication.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
+
+/**
+ * Sets up the application context for database repository tests.
+ *
+ * The @SpringBootTest annotation on the repository tests will find this class by working up the package hierarchy.
+ * This class must be in the "db" package in order to find the entities in "db.entity" and repositories in "db.repository".
+ *
+ * The DataSourceFactory is excluded so that Spring Boot will load an in-memory H2 database.
+ */
+@SpringBootApplication
+@ComponentScan(
+ excludeFilters = {
+ @ComponentScan.Filter(
+ type = FilterType.ASSIGNABLE_TYPE,
+ value = DataSourceFactory.class)
+ })
+public class RepositoryTestApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(RepositoryTestApplication.class, args);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/RepositoryBaseTest.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/RepositoryBaseTest.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/RepositoryBaseTest.java
new file mode 100644
index 0000000..7ccd01f
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/RepositoryBaseTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.repository;
+
+import org.apache.nifi.registry.db.RepositoryTestApplication;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.TestExecutionListeners;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
+import org.springframework.test.context.transaction.TransactionalTestExecutionListener;
+import org.springframework.transaction.annotation.Transactional;
+
+@Transactional
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = RepositoryTestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
+@TestExecutionListeners({DependencyInjectionTestExecutionListener.class, TransactionalTestExecutionListener.class})
+public abstract class RepositoryBaseTest {
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/TestBucketItemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/TestBucketItemRepository.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/TestBucketItemRepository.java
new file mode 100644
index 0000000..657d599
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/TestBucketItemRepository.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.repository;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Sort;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestBucketItemRepository extends RepositoryBaseTest {
+
+ @Autowired
+ private BucketRepository bucketRepository;
+
+ @Autowired
+ private BucketItemRepository bucketItemRepository;
+
+ @Test
+ public void testFindAllPageable() {
+ final Page<BucketItemEntity> page = bucketItemRepository.findAll(new PageRequest(0, 10));
+ assertNotNull(page);
+ assertEquals(1, page.getTotalPages());
+ assertEquals(3, page.getTotalElements());
+
+ final List<BucketItemEntity> entities = page.getContent();
+ assertNotNull(entities);
+ assertEquals(3, entities.size());
+ }
+
+ @Test
+ public void testFindAll() {
+ final Iterable<BucketItemEntity> entities = bucketItemRepository.findAll();
+ assertNotNull(entities);
+
+ int count = 0;
+ for (BucketItemEntity entity : entities) {
+ count++;
+ }
+ assertEquals(3, count);
+ }
+
+ @Test
+ public void testFindByBucket() {
+ final BucketEntity bucket = bucketRepository.findOne("1");
+ assertNotNull(bucket);
+
+ final List<BucketItemEntity> entities = bucketItemRepository.findByBucket(bucket);
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ }
+
+ @Test
+ public void testFindByBucketPageable() {
+ final BucketEntity bucket = bucketRepository.findOne("1");
+ assertNotNull(bucket);
+
+ final List<BucketItemEntity> entities = bucketItemRepository.findByBucket(bucket, new PageRequest(0, 2, new Sort(Sort.Direction.ASC, "id")));
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ assertEquals("1", entities.get(0).getId());
+ assertEquals("2", entities.get(1).getId());
+ }
+
+ @Test
+ public void testFindByBucketSort() {
+ final BucketEntity bucket = bucketRepository.findOne("1");
+ assertNotNull(bucket);
+
+ final List<BucketItemEntity> entities = bucketItemRepository.findByBucket(bucket, new Sort(Sort.Direction.DESC, "id"));
+ assertNotNull(entities);
+ assertEquals(2, entities.size());
+ assertEquals("2", entities.get(0).getId());
+ assertEquals("1", entities.get(1).getId());
+ }
+
+}