You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/09/21 13:50:40 UTC

[6/7] nifi-registry git commit: NIFIREG-18 Initial plumbling for H2 database - Setup Flyway with initial migration to define tables - Setup entity classes with repositories - Setup unit testing for repositories - Removed existing MetadataProvider concept

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
new file mode 100644
index 0000000..ddad946
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * A FlowPersistenceProvider that uses the local filesystem for storage.
+ */
+public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvider {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(FileSystemFlowPersistenceProvider.class);
+
+    static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
+
+    static final String SNAPSHOT_EXTENSION = ".snapshot";
+
+    private File flowStorageDir;
+
+    @Override
+    public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        final Map<String,String> props = configurationContext.getProperties();
+        if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) {
+            throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided");
+        }
+
+        final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP);
+        if (StringUtils.isBlank(flowStorageDirValue)) {
+            throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank");
+        }
+
+        try {
+            flowStorageDir = new File(flowStorageDirValue);
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir);
+            LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()});
+        } catch (IOException e) {
+            throw new ProviderCreationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void saveSnapshot(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
+        final File bucketDir = new File(flowStorageDir, context.getBucketId());
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e);
+        }
+
+        final File flowDir = new File(bucketDir, context.getFlowId());
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing flow directory at " + flowDir.getAbsolutePath(), e);
+        }
+
+        final String versionString = String.valueOf(context.getVersion());
+        final File versionDir = new File(flowDir, versionString);
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e);
+        }
+
+        final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION);
+        if (versionFile.exists()) {
+            throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()});
+        }
+
+        try (final OutputStream out = new FileOutputStream(versionFile)) {
+            out.write(content);
+            out.flush();
+        } catch (Exception e) {
+            throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public synchronized byte[] getSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+        final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()});
+        }
+
+        if (!snapshotFile.exists()) {
+            return null;
+        }
+
+        try (final InputStream in = new FileInputStream(snapshotFile)){
+            return IOUtils.toByteArray(in);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error reading snapshot file: " + snapshotFile.getAbsolutePath(), e);
+        }
+    }
+
+    @Override
+    public synchronized void deleteSnapshots(final String bucketId, final String flowId) throws FlowPersistenceException {
+        final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId);
+        if (!flowDir.exists()) {
+            LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()});
+            return;
+        }
+
+        try {
+            org.apache.commons.io.FileUtils.cleanDirectory(flowDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error deleting snapshots at " + flowDir.getAbsolutePath(), e);
+        }
+    }
+
+    @Override
+    public synchronized void deleteSnapshot(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+        final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+        if (!snapshotFile.exists()) {
+            LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()});
+            return;
+        }
+
+        final boolean deleted = snapshotFile.delete();
+        if (!deleted) {
+            throw new FlowPersistenceException("Unable to delete snapshot at " + snapshotFile.getAbsolutePath());
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()});
+        }
+    }
+
+    protected File getSnapshotFile(final String bucketId, final String flowId, final int version) {
+        final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION;
+        return new File(flowStorageDir, snapshotFilename);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
new file mode 100644
index 0000000..5b1f058
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+
+/**
+ * Standard implementation of FlowSnapshotContext.
+ */
+public class StandardFlowSnapshotContext implements FlowSnapshotContext {
+
+    private final String bucketId;
+    private final String bucketName;
+    private final String flowId;
+    private final String flowName;
+    private final int version;
+    private final String comments;
+    private final long snapshotTimestamp;
+
+    private StandardFlowSnapshotContext(final Builder builder) {
+        this.bucketId = builder.bucketId;
+        this.bucketName = builder.bucketName;
+        this.flowId = builder.flowId;
+        this.flowName = builder.flowName;
+        this.version = builder.version;
+        this.comments = builder.comments;
+        this.snapshotTimestamp = builder.snapshotTimestamp;
+
+        Validate.notBlank(bucketId);
+        Validate.notBlank(bucketName);
+        Validate.notBlank(flowId);
+        Validate.notBlank(flowName);
+        Validate.isTrue(version > 0);
+        Validate.isTrue(snapshotTimestamp > 0);
+    }
+
+    @Override
+    public String getBucketId() {
+        return bucketId;
+    }
+
+    @Override
+    public String getBucketName() {
+        return bucketName;
+    }
+
+    @Override
+    public String getFlowId() {
+        return flowId;
+    }
+
+    @Override
+    public String getFlowName() {
+        return flowName;
+    }
+
+    @Override
+    public int getVersion() {
+        return version;
+    }
+
+    @Override
+    public String getComments() {
+        return comments;
+    }
+
+    @Override
+    public long getSnapshotTimestamp() {
+        return snapshotTimestamp;
+    }
+
+    /**
+     * Builder for creating instances of StandardFlowSnapshotContext.
+     */
+    public static class Builder {
+
+        private String bucketId;
+        private String bucketName;
+        private String flowId;
+        private String flowName;
+        private int version;
+        private String comments;
+        private long snapshotTimestamp;
+
+        public Builder() {
+
+        }
+
+        public Builder(final Bucket bucket, final VersionedFlowSnapshotMetadata snapshotMetadata) {
+            bucketId(bucket.getIdentifier());
+            bucketName(bucket.getName());
+            flowId(snapshotMetadata.getFlowIdentifier());
+            flowName(snapshotMetadata.getFlowName());
+            version(snapshotMetadata.getVersion());
+            comments(snapshotMetadata.getComments());
+            snapshotTimestamp(snapshotMetadata.getTimestamp());
+        }
+
+        public Builder bucketId(final String bucketId) {
+            this.bucketId = bucketId;
+            return this;
+        }
+
+        public Builder bucketName(final String bucketName) {
+            this.bucketName = bucketName;
+            return this;
+        }
+
+        public Builder flowId(final String flowId) {
+            this.flowId = flowId;
+            return this;
+        }
+
+        public Builder flowName(final String flowName) {
+            this.flowName = flowName;
+            return this;
+        }
+
+        public Builder version(final int version) {
+            this.version = version;
+            return this;
+        }
+
+        public Builder comments(final String comments) {
+            this.comments = comments;
+            return this;
+        }
+
+        public Builder snapshotTimestamp(final long snapshotTimestamp) {
+            this.snapshotTimestamp = snapshotTimestamp;
+            return this;
+        }
+
+        public StandardFlowSnapshotContext build() {
+            return new StandardFlowSnapshotContext(this);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
index eff655d..515de10 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
@@ -40,7 +40,7 @@ public class JAXBSerializer<T> implements Serializer<T> {
         try {
             this.jaxbContext = JAXBContext.newInstance(clazz);
         } catch (JAXBException e) {
-            throw new RuntimeException("Unable to create JAXBContext.", e);
+            throw new RuntimeException("Unable to create JAXBContext: " + e.getMessage(), e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
index c80dc21..b4448a5 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
@@ -17,15 +17,16 @@
 package org.apache.nifi.registry.service;
 
 import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntityType;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntityKey;
 import org.apache.nifi.registry.flow.VersionedFlow;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.registry.metadata.BucketMetadata;
-import org.apache.nifi.registry.metadata.FlowMetadata;
-import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
-import org.apache.nifi.registry.metadata.StandardBucketMetadata;
-import org.apache.nifi.registry.metadata.StandardFlowMetadata;
-import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata;
 
+import java.util.Date;
 import java.util.LinkedHashSet;
 import java.util.Set;
 import java.util.SortedSet;
@@ -36,90 +37,92 @@ import java.util.TreeSet;
  */
 public class DataModelMapper {
 
-    public static Bucket map(final BucketMetadata bucketMetadata) {
+    public static BucketEntity map(final Bucket bucket) {
+        final BucketEntity bucketEntity = new BucketEntity();
+        bucketEntity.setId(bucket.getIdentifier());
+        bucketEntity.setName(bucket.getName());
+        bucketEntity.setDescription(bucket.getDescription());
+        bucketEntity.setCreated(new Date(bucket.getCreatedTimestamp()));
+
+        // don't map items on the way in
+
+        return bucketEntity;
+    }
+
+    public static Bucket map(final BucketEntity bucketEntity, final boolean mapChildren) {
         final Bucket bucket = new Bucket();
-        bucket.setIdentifier(bucketMetadata.getIdentifier());
-        bucket.setName(bucketMetadata.getName());
-        bucket.setDescription(bucketMetadata.getDescription());
-        bucket.setCreatedTimestamp(bucketMetadata.getCreatedTimestamp());
+        bucket.setIdentifier(bucketEntity.getId());
+        bucket.setName(bucketEntity.getName());
+        bucket.setDescription(bucketEntity.getDescription());
+        bucket.setCreatedTimestamp(bucketEntity.getCreated().getTime());
 
-        if (bucketMetadata.getFlowMetadata() != null) {
+        if (mapChildren && bucketEntity.getItems() != null) {
             final Set<VersionedFlow> flows = new LinkedHashSet<>();
-            bucketMetadata.getFlowMetadata().stream().forEach(f -> flows.add(map(f)));
+            for (final BucketItemEntity itemEntity : bucketEntity.getItems()) {
+                if (BucketItemEntityType.FLOW == itemEntity.getType()) {
+                    // we never return the snapshots when retrieving a bucket
+                    flows.add(map((FlowEntity) itemEntity, false));
+                }
+            }
             bucket.setVersionedFlows(flows);
         }
 
         return bucket;
     }
 
-    public static BucketMetadata map(final Bucket bucket) {
-        final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder()
-                .identifier(bucket.getIdentifier())
-                .name(bucket.getName())
-                .description(bucket.getDescription())
-                .created(bucket.getCreatedTimestamp());
+    public static FlowEntity map(final VersionedFlow versionedFlow) {
+        final FlowEntity flowEntity = new FlowEntity();
+        flowEntity.setId(versionedFlow.getIdentifier());
+        flowEntity.setName(versionedFlow.getName());
+        flowEntity.setDescription(versionedFlow.getDescription());
+        flowEntity.setCreated(new Date(versionedFlow.getCreatedTimestamp()));
+        flowEntity.setModified(new Date(versionedFlow.getModifiedTimestamp()));
+        flowEntity.setType(BucketItemEntityType.FLOW);
 
-        if (bucket.getVersionedFlows() != null) {
-            bucket.getVersionedFlows().stream().forEach(f -> builder.addFlow(map(f)));
-        }
+        // don't map snapshots on the way in
 
-        return builder.build();
+        return flowEntity;
     }
 
-    public static VersionedFlow map(final FlowMetadata flowMetadata) {
+    public static VersionedFlow map(final FlowEntity flowEntity, boolean mapChildren) {
         final VersionedFlow versionedFlow = new VersionedFlow();
-        versionedFlow.setIdentifier(flowMetadata.getIdentifier());
-        versionedFlow.setName(flowMetadata.getName());
-        versionedFlow.setBucketIdentifier(flowMetadata.getBucketIdentifier());
-        versionedFlow.setDescription(flowMetadata.getDescription());
-        versionedFlow.setCreatedTimestamp(flowMetadata.getCreatedTimestamp());
-        versionedFlow.setModifiedTimestamp(flowMetadata.getModifiedTimestamp());
-
-        if (flowMetadata.getSnapshotMetadata() != null) {
+        versionedFlow.setIdentifier(flowEntity.getId());
+        versionedFlow.setBucketIdentifier(flowEntity.getBucket().getId());
+        versionedFlow.setName(flowEntity.getName());
+        versionedFlow.setDescription(flowEntity.getDescription());
+        versionedFlow.setCreatedTimestamp(flowEntity.getCreated().getTime());
+        versionedFlow.setModifiedTimestamp(flowEntity.getModified().getTime());
+
+        if (mapChildren && flowEntity.getSnapshots() != null) {
             final SortedSet<VersionedFlowSnapshotMetadata> snapshots = new TreeSet<>();
-            flowMetadata.getSnapshotMetadata().stream().forEach(s -> snapshots.add(map(s)));
+            flowEntity.getSnapshots().stream().forEach(s -> snapshots.add(map(s)));
             versionedFlow.setSnapshotMetadata(snapshots);
         }
 
         return versionedFlow;
     }
 
-    public static FlowMetadata map(final VersionedFlow versionedFlow) {
-        final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder()
-                .identifier(versionedFlow.getIdentifier())
-                .name(versionedFlow.getName())
-                .bucketIdentifier(versionedFlow.getBucketIdentifier())
-                .description(versionedFlow.getDescription())
-                .created(versionedFlow.getCreatedTimestamp())
-                .modified(versionedFlow.getModifiedTimestamp());
-
-        if (versionedFlow.getSnapshotMetadata() != null) {
-            versionedFlow.getSnapshotMetadata().stream().forEach(s -> builder.addSnapshot(map(s)));
-        }
+    public static FlowSnapshotEntity map(final VersionedFlowSnapshotMetadata versionedFlowSnapshot) {
+        final FlowSnapshotEntityKey key = new FlowSnapshotEntityKey();
+        key.setFlowId(versionedFlowSnapshot.getFlowIdentifier());
+        key.setVersion(versionedFlowSnapshot.getVersion());
 
-        return builder.build();
+        final FlowSnapshotEntity flowSnapshotEntity = new FlowSnapshotEntity();
+        flowSnapshotEntity.setId(key);
+        flowSnapshotEntity.setComments(versionedFlowSnapshot.getComments());
+        flowSnapshotEntity.setCreated(new Date(versionedFlowSnapshot.getTimestamp()));
+        return flowSnapshotEntity;
     }
 
-    public static VersionedFlowSnapshotMetadata map(final FlowSnapshotMetadata flowSnapshotMetadata) {
+    public static VersionedFlowSnapshotMetadata map(final FlowSnapshotEntity flowSnapshotEntity) {
         final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
-        metadata.setBucketIdentifier(flowSnapshotMetadata.getBucketIdentifier());
-        metadata.setFlowIdentifier(flowSnapshotMetadata.getFlowIdentifier());
-        metadata.setFlowName(flowSnapshotMetadata.getFlowName());
-        metadata.setComments(flowSnapshotMetadata.getComments());
-        metadata.setTimestamp(flowSnapshotMetadata.getCreatedTimestamp());
-        metadata.setVersion(flowSnapshotMetadata.getVersion());
+        metadata.setFlowIdentifier(flowSnapshotEntity.getId().getFlowId());
+        metadata.setVersion(flowSnapshotEntity.getId().getVersion());
+        metadata.setBucketIdentifier(flowSnapshotEntity.getFlow().getBucket().getId());
+        metadata.setFlowName(flowSnapshotEntity.getFlow().getName());
+        metadata.setComments(flowSnapshotEntity.getComments());
+        metadata.setTimestamp(flowSnapshotEntity.getCreated().getTime());
         return metadata;
     }
 
-    public static FlowSnapshotMetadata map(final VersionedFlowSnapshotMetadata metadata) {
-        return new StandardFlowSnapshotMetadata.Builder()
-                .bucketIdentifier(metadata.getBucketIdentifier())
-                .flowIdentifier(metadata.getFlowIdentifier())
-                .flowName(metadata.getFlowName())
-                .comments(metadata.getComments())
-                .created(metadata.getTimestamp())
-                .version(metadata.getVersion())
-                .build();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
new file mode 100644
index 0000000..3c17cee
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+import org.apache.nifi.registry.service.params.QueryParameters;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A service for managing metadata about all objects stored by the registry.
+ *
+ */
+public interface MetadataService {
+
+    /**
+     * Creates the given bucket.
+     *
+     * @param bucket the bucket to create
+     * @return the created bucket
+     */
+    BucketEntity createBucket(BucketEntity bucket);
+
+    /**
+     * Retrieves the bucket with the given id.
+     *
+     * @param bucketIdentifier the id of the bucket to retrieve
+     * @return the bucket with the given id, or null if it does not exist
+     */
+    BucketEntity getBucketById(String bucketIdentifier);
+
+    /**
+     * Retrieves the buckets with the given name. The name comparison must be case-insensitive.
+     *
+     * @param name the name of the bucket to retrieve
+     * @return the buckets with the given name, or empty list if none exist
+     */
+    List<BucketEntity> getBucketsByName(String name);
+
+    /**
+     * Updates the given bucket, only the name and description should be allowed to be updated.
+     *
+     * @param bucket the updated bucket to save
+     * @return the updated bucket, or null if no bucket with the given id exists
+     */
+    BucketEntity updateBucket(BucketEntity bucket);
+
+    /**
+     * Deletes the bucket, as well as any objects that reference the bucket.
+     *
+     * @param bucket the bucket to delete
+     */
+    void deleteBucket(BucketEntity bucket);
+
+    /**
+     * Retrieves all buckets known to this metadata provider.
+     *
+     * @param params the paging and sorting params, or null
+     * @return the set of all buckets
+     */
+    List<BucketEntity> getBuckets(QueryParameters params);
+
+    /**
+     * Retrieves items across all buckets.
+     *
+     * @param queryParameters the parameters for retrieving the items, or null
+     * @return the set of all items
+     */
+    List<BucketItemEntity> getBucketItems(QueryParameters queryParameters);
+
+    /**
+     * Retrieves items for the given bucket.
+     *
+     * @param bucket the bucket to retrieve items for
+     * @param queryParameters the parameters for retrieving the items, or null
+     * @return the set of items for the bucket
+     */
+    List<BucketItemEntity> getBucketItems(QueryParameters queryParameters, BucketEntity bucket);
+
+    /**
+     * Creates a versioned flow in the given bucket.
+     *
+     * @param flow the versioned flow to create
+     * @return the created versioned flow
+     * @throws IllegalStateException if no bucket with the given identifier exists
+     */
+    FlowEntity createFlow(FlowEntity flow);
+
+    /**
+     * Retrieves the versioned flow with the given id.
+     *
+     * @param flowIdentifier the identifier of the flow to retrieve
+     * @return the versioned flow with the given id, or null if no flow with the given id exists
+     */
+    FlowEntity getFlowById(String flowIdentifier);
+
+    /**
+     * Retrieves the versioned flows with the given name. The name comparison must be case-insensitive.
+     *
+     * @param name the name of the flow to retrieve
+     * @return the versioned flows with the given name, or empty list if no flows with the given name exists
+     */
+    List<FlowEntity> getFlowsByName(String name);
+
+    /**
+     * Updates the given versioned flow, only the name and description should be allowed to be updated.
+     *
+     * @param flow the updated versioned flow to save
+     * @return the updated versioned flow
+     */
+    FlowEntity updateFlow(FlowEntity flow);
+
+    /**
+     * Deletes the flow if one exists.
+     *
+     * @param flow the flow to delete
+     */
+    void deleteFlow(FlowEntity flow);
+
+    /**
+     * Retrieves all versioned flows known to this metadata provider.
+     *
+     * @param queryParameters the paging and sorting params, or null
+     * @return the set of all versioned flows
+     */
+    List<FlowEntity> getFlows(QueryParameters queryParameters);
+
+    /**
+     * Creates a versioned flow snapshot.
+     *
+     * @param flowSnapshot the snapshot to create
+     * @return the created snapshot
+     * @throws IllegalStateException if the versioned flow specified by flowSnapshot.getFlowIdentifier() does not exist
+     */
+    FlowSnapshotEntity createFlowSnapshot(FlowSnapshotEntity flowSnapshot);
+
+    /**
+     * Retrieves the snapshot for the given flow identifier and snapshot version.
+     *
+     * @param flowIdentifier the identifier of the flow the snapshot belongs to
+     * @param version the version of the snapshot
+     * @return the versioned flow snapshot for the given flow identifier and version, or null if none exists
+     */
+    FlowSnapshotEntity getFlowSnapshot(String flowIdentifier, Integer version);
+
+    /**
+     * Deletes the flow snapshot.
+     *
+     * @param flowSnapshot the flow snapshot to delete
+     */
+    void deleteFlowSnapshot(FlowSnapshotEntity flowSnapshot);
+
+    /**
+     * @return the set of field names for Buckets
+     */
+    Set<String> getBucketFields();
+
+    /**
+     * @return the set of field names for BucketItems
+     */
+    Set<String> getBucketItemFields();
+
+    /**
+     * @return the set of field names for Flows
+     */
+    Set<String> getFlowFields();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
index 6ca999c..fc067e6 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
@@ -17,23 +17,27 @@
 package org.apache.nifi.registry.service;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
 import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.bucket.BucketItem;
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
 import org.apache.nifi.registry.exception.ResourceNotFoundException;
 import org.apache.nifi.registry.flow.FlowPersistenceProvider;
 import org.apache.nifi.registry.flow.FlowSnapshotContext;
-import org.apache.nifi.registry.flow.StandardFlowSnapshotContext;
 import org.apache.nifi.registry.flow.VersionedFlow;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
-import org.apache.nifi.registry.metadata.BucketMetadata;
-import org.apache.nifi.registry.metadata.FlowMetadata;
-import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
-import org.apache.nifi.registry.metadata.MetadataProvider;
-import org.apache.nifi.registry.metadata.StandardBucketMetadata;
-import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
 import org.apache.nifi.registry.serialization.Serializer;
+import org.apache.nifi.registry.service.params.QueryParameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.validation.ConstraintViolation;
 import javax.validation.ConstraintViolationException;
@@ -41,17 +45,29 @@ import javax.validation.Validator;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
-import java.util.Objects;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+/**
+ * Main service for all back-end operations, REST resources should only interact with this service.
+ *
+ * This service is marked as @Transactional so that Spring will automatically start a transaction upon entering
+ * any method, and will rollback the transaction if any Exception is thrown out of a method.
+ *
+ */
 @Service
+@Transactional(rollbackFor = Exception.class)
 public class RegistryService {
 
-    private final MetadataProvider metadataProvider;
+    private static final Logger LOGGER = LoggerFactory.getLogger(RegistryService.class);
+
+    private final MetadataService metadataService;
     private final FlowPersistenceProvider flowPersistenceProvider;
     private final Serializer<VersionedFlowSnapshot> snapshotSerializer;
     private final Validator validator;
@@ -60,18 +76,19 @@ public class RegistryService {
     private final Lock readLock = lock.readLock();
     private final Lock writeLock = lock.writeLock();
 
-    public RegistryService(@Autowired final MetadataProvider metadataProvider,
-                           @Autowired final FlowPersistenceProvider flowPersistenceProvider,
-                           @Autowired final Serializer<VersionedFlowSnapshot> snapshotSerializer,
-                           @Autowired final Validator validator) {
-        this.metadataProvider = metadataProvider;
+    @Autowired
+    public RegistryService(final MetadataService metadataService,
+                           final FlowPersistenceProvider flowPersistenceProvider,
+                           final Serializer<VersionedFlowSnapshot> snapshotSerializer,
+                           final Validator validator) {
+        this.metadataService = metadataService;
         this.flowPersistenceProvider = flowPersistenceProvider;
         this.snapshotSerializer = snapshotSerializer;
         this.validator = validator;
-        Objects.requireNonNull(this.metadataProvider);
-        Objects.requireNonNull(this.flowPersistenceProvider);
-        Objects.requireNonNull(this.snapshotSerializer);
-        Objects.requireNonNull(this.validator);
+        Validate.notNull(this.metadataService);
+        Validate.notNull(this.flowPersistenceProvider);
+        Validate.notNull(this.snapshotSerializer);
+        Validate.notNull(this.validator);
     }
 
     private <T>  void validate(T t, String invalidMessage) {
@@ -97,41 +114,41 @@ public class RegistryService {
 
         writeLock.lock();
         try {
-            final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName());
-            if (existingBucketWithSameName != null) {
-                throw new IllegalStateException("A bucket with the same name already exists: " + existingBucketWithSameName.getIdentifier());
+            final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName());
+            if (bucketsWithSameName.size() > 0) {
+                throw new IllegalStateException("A bucket with the same name already exists");
             }
 
-            final BucketMetadata createdBucket = metadataProvider.createBucket(DataModelMapper.map(bucket));
-            return DataModelMapper.map(createdBucket);
+            final BucketEntity createdBucket = metadataService.createBucket(DataModelMapper.map(bucket));
+            return DataModelMapper.map(createdBucket, false);
         } finally {
             writeLock.unlock();
         }
     }
 
-    public Bucket getBucket(final String bucketIdentifier) {
+    public Bucket getBucket(final String bucketIdentifier, final boolean verbose) {
         if (bucketIdentifier == null) {
             throw new IllegalArgumentException("Bucket Identifier cannot be null");
         }
 
         readLock.lock();
         try {
-            final BucketMetadata bucket = metadataProvider.getBucketById(bucketIdentifier);
+            final BucketEntity bucket = metadataService.getBucketById(bucketIdentifier);
             if (bucket == null) {
                 throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
             }
 
-            return DataModelMapper.map(bucket);
+            return DataModelMapper.map(bucket, verbose);
         } finally {
             readLock.unlock();
         }
     }
 
-    public Set<Bucket> getBuckets() {
+    public List<Bucket> getBuckets(final QueryParameters queryParameters) {
         readLock.lock();
         try {
-            final Set<BucketMetadata> buckets = metadataProvider.getBuckets();
-            return buckets.stream().map(b -> DataModelMapper.map(b)).collect(Collectors.toSet());
+            final List<BucketEntity> buckets = metadataService.getBuckets(queryParameters);
+            return buckets.stream().map(b -> DataModelMapper.map(b, false)).collect(Collectors.toList());
         } finally {
             readLock.unlock();
         }
@@ -149,7 +166,7 @@ public class RegistryService {
         writeLock.lock();
         try {
             // ensure a bucket with the given id exists
-            final BucketMetadata existingBucketById = metadataProvider.getBucketById(bucket.getIdentifier());
+            final BucketEntity existingBucketById = metadataService.getBucketById(bucket.getIdentifier());
             if (existingBucketById == null) {
                 throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucket.getIdentifier());
             }
@@ -157,26 +174,28 @@ public class RegistryService {
             // ensure a different bucket with the same name does not exist
             // since we're allowing partial updates here, only check this if a non-null name is provided
             if (StringUtils.isNotBlank(bucket.getName())) {
-                final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName());
-                if (existingBucketWithSameName != null && !existingBucketWithSameName.getIdentifier().equals(existingBucketById.getIdentifier())) {
-                    throw new IllegalStateException("A bucket with the same name already exists: " + bucket.getName());
+                final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName());
+                if (bucketsWithSameName != null) {
+                    for (final BucketEntity bucketWithSameName : bucketsWithSameName) {
+                        if (!bucketWithSameName.getId().equals(existingBucketById.getId())){
+                            throw new IllegalStateException("A bucket with the same name already exists: " + bucket.getName());
+                        }
+                    }
                 }
             }
 
-            final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder(existingBucketById);
-
             // transfer over the new values to the existing bucket
             if (StringUtils.isNotBlank(bucket.getName())) {
-                builder.name(bucket.getName());
+                existingBucketById.setName(bucket.getName());
             }
 
             if (bucket.getDescription() != null) {
-                builder.description(bucket.getDescription());
+                existingBucketById.setDescription(bucket.getDescription());
             }
 
             // perform the actual update
-            final BucketMetadata updatedBucket = metadataProvider.updateBucket(builder.build());
-            return DataModelMapper.map(updatedBucket);
+            final BucketEntity updatedBucket = metadataService.updateBucket(existingBucketById);
+            return DataModelMapper.map(updatedBucket, false);
         } finally {
             writeLock.unlock();
         }
@@ -190,28 +209,67 @@ public class RegistryService {
         writeLock.lock();
         try {
             // ensure the bucket exists
-            final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier);
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
             if (existingBucket == null) {
                 throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
             }
 
-            // retrieve the versioned flows that are in this bucket
-            final Set<FlowMetadata> bucketFlows = metadataProvider.getFlows(bucketIdentifier);
-
             // for each flow in the bucket, delete all snapshots from the flow persistence provider
-            for (final FlowMetadata bucketFlow : bucketFlows) {
-                flowPersistenceProvider.deleteSnapshots(bucketIdentifier, bucketFlow.getIdentifier());
+            for (final FlowEntity flowEntity : existingBucket.getFlows()) {
+                flowPersistenceProvider.deleteSnapshots(bucketIdentifier, flowEntity.getId());
             }
 
             // now delete the bucket from the metadata provider, which deletes all flows referencing it
-            metadataProvider.deleteBucket(bucketIdentifier);
+            metadataService.deleteBucket(existingBucket);
 
-            return DataModelMapper.map(existingBucket);
+            return DataModelMapper.map(existingBucket, false);
         } finally {
             writeLock.unlock();
         }
     }
 
+    // ---------------------- BucketItem methods ---------------------------------------------
+
+    public List<BucketItem> getBucketItems(final QueryParameters queryParameters) {
+        readLock.lock();
+        try {
+            final List<BucketItem> bucketItems = new ArrayList<>();
+            metadataService.getBucketItems(queryParameters).stream().forEach(b -> addBucketItem(bucketItems, b));
+            return bucketItems;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<BucketItem> getBucketItems(final QueryParameters queryParameters, final String bucketIdentifier) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be null");
+        }
+
+        readLock.lock();
+        try {
+            final BucketEntity bucket = metadataService.getBucketById(bucketIdentifier);
+            if (bucket == null) {
+                throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
+            }
+
+            final List<BucketItem> bucketItems = new ArrayList<>();
+            metadataService.getBucketItems(queryParameters, bucket).stream().forEach(b -> addBucketItem(bucketItems, b));
+            return bucketItems;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    private void addBucketItem(final List<BucketItem> bucketItems, final BucketItemEntity itemEntity) {
+        if (itemEntity instanceof FlowEntity) {
+            final FlowEntity flowEntity = (FlowEntity) itemEntity;
+            bucketItems.add(DataModelMapper.map(flowEntity, false));
+        } else {
+            LOGGER.error("Unknown type of BucketItemEntity: " + itemEntity.getClass().getCanonicalName());
+        }
+    }
+
     // ---------------------- VersionedFlow methods ---------------------------------------------
 
     public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) {
@@ -245,66 +303,73 @@ public class RegistryService {
         writeLock.lock();
         try {
             // ensure the bucket exists
-            final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier);
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
             if (existingBucket == null) {
                 throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
             }
 
-            final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName());
-            if (existingFlowWithSameName != null) {
-                throw new IllegalStateException("A VersionedFlow with the same name already exists: " + existingFlowWithSameName.getIdentifier());
+            // ensure another flow with the same name doesn't exist
+            final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(versionedFlow.getName());
+            if (flowsWithSameName != null && flowsWithSameName.size() > 0) {
+                throw new IllegalStateException("A VersionedFlow with the same name already exists");
             }
 
-            // create the flow
-            final FlowMetadata createdFlow = metadataProvider.createFlow(bucketIdentifier, DataModelMapper.map(versionedFlow));
-            return DataModelMapper.map(createdFlow);
+            // convert from dto to entity and set the bucket relationship
+            final FlowEntity flowEntity = DataModelMapper.map(versionedFlow);
+            flowEntity.setBucket(existingBucket);
+
+            // persist the flow and return the created entity
+            final FlowEntity createdFlow = metadataService.createFlow(flowEntity);
+            return DataModelMapper.map(createdFlow, false);
         } finally {
             writeLock.unlock();
         }
     }
 
-    public VersionedFlow getFlow(final String flowIdentifier) {
+    public VersionedFlow getFlow(final String flowIdentifier, final boolean verbose) {
         if (StringUtils.isBlank(flowIdentifier)) {
             throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
         }
 
         readLock.lock();
         try {
-            final FlowMetadata flowMetadata = metadataProvider.getFlowById(flowIdentifier);
-            if (flowMetadata == null) {
+            final FlowEntity flowEntity = metadataService.getFlowById(flowIdentifier);
+            if (flowEntity == null) {
                 throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier);
             }
 
-            return DataModelMapper.map(flowMetadata);
+            return DataModelMapper.map(flowEntity, verbose);
         } finally {
             readLock.unlock();
         }
     }
 
-    public Set<VersionedFlow> getFlows() {
+    public List<VersionedFlow> getFlows(final QueryParameters queryParameters) {
         readLock.lock();
         try {
-            final Set<FlowMetadata> flows = metadataProvider.getFlows();
-            return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet());
+            // return non-verbose set of all flows
+            final List<FlowEntity> flows = metadataService.getFlows(queryParameters);
+            return flows.stream().map(f -> DataModelMapper.map(f, false)).collect(Collectors.toList());
         } finally {
             readLock.unlock();
         }
     }
 
-    public Set<VersionedFlow> getFlows(String bucketId) {
+    public List<VersionedFlow> getFlows(final String bucketId) {
         if (StringUtils.isBlank(bucketId)) {
             throw new IllegalArgumentException("Bucket Identifier cannot be null");
         }
 
         readLock.lock();
         try {
-            final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketId);
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketId);
             if (existingBucket == null) {
                 throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketId);
             }
 
-            final Set<FlowMetadata> flows = metadataProvider.getFlows(bucketId);
-            return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet());
+            // return non-verbose set of flows for the given bucket
+            final Set<FlowEntity> flows = existingBucket.getFlows();
+            return flows.stream().map(f -> DataModelMapper.map(f, false)).collect(Collectors.toList());
         } finally {
             readLock.unlock();
         }
@@ -322,7 +387,7 @@ public class RegistryService {
         writeLock.lock();
         try {
             // ensure a flow with the given id exists
-            final FlowMetadata existingFlow = metadataProvider.getFlowById(versionedFlow.getIdentifier());
+            final FlowEntity existingFlow = metadataService.getFlowById(versionedFlow.getIdentifier());
             if (existingFlow == null) {
                 throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + versionedFlow.getIdentifier());
             }
@@ -330,28 +395,28 @@ public class RegistryService {
             // ensure a different flow with the same name does not exist
             // since we're allowing partial updates here, only check this if a non-null name is provided
             if (StringUtils.isNotBlank(versionedFlow.getName())) {
-                final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName());
-                if (existingFlowWithSameName != null && !existingFlowWithSameName.getIdentifier().equals(existingFlow.getIdentifier())) {
-                    throw new IllegalStateException("A VersionedFlow with the same name already exists: " + versionedFlow.getName());
+                final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(versionedFlow.getName());
+                if (flowsWithSameName != null) {
+                    for (final FlowEntity flowWithSameName : flowsWithSameName) {
+                         if(!flowWithSameName.getId().equals(existingFlow.getId())) {
+                            throw new IllegalStateException("A VersionedFlow with the same name already exists: " + versionedFlow.getName());
+                        }
+                    }
                 }
             }
 
-            final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder(existingFlow);
-
             // transfer over the new values to the existing flow
             if (StringUtils.isNotBlank(versionedFlow.getName())) {
-                builder.name(versionedFlow.getName());
+                existingFlow.setName(versionedFlow.getName());
             }
 
             if (versionedFlow.getDescription() != null) {
-                builder.description(versionedFlow.getDescription());
+                existingFlow.setDescription(versionedFlow.getDescription());
             }
 
-            builder.modified(System.currentTimeMillis());
-
             // perform the actual update
-            final FlowMetadata updatedFlow = metadataProvider.updateFlow(builder.build());
-            return DataModelMapper.map(updatedFlow);
+            final FlowEntity updatedFlow = metadataService.updateFlow(existingFlow);
+            return DataModelMapper.map(updatedFlow, false);
         } finally {
             writeLock.unlock();
         }
@@ -365,18 +430,18 @@ public class RegistryService {
         writeLock.lock();
         try {
             // ensure the flow exists
-            final FlowMetadata existingFlow = metadataProvider.getFlowById(flowIdentifier);
+            final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier);
             if (existingFlow == null) {
                 throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier);
             }
 
             // delete all snapshots from the flow persistence provider
-            flowPersistenceProvider.deleteSnapshots(existingFlow.getBucketIdentifier(), existingFlow.getIdentifier());
+            flowPersistenceProvider.deleteSnapshots(existingFlow.getBucket().getId(), existingFlow.getId());
 
             // now delete the flow from the metadata provider
-            metadataProvider.deleteFlow(flowIdentifier);
+            metadataService.deleteFlow(existingFlow);
 
-            return DataModelMapper.map(existingFlow);
+            return DataModelMapper.map(existingFlow, false);
         } finally {
             writeLock.unlock();
         }
@@ -401,22 +466,22 @@ public class RegistryService {
             final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
 
             // ensure the bucket exists
-            final BucketMetadata existingBucket = metadataProvider.getBucketById(snapshotMetadata.getBucketIdentifier());
+            final BucketEntity existingBucket = metadataService.getBucketById(snapshotMetadata.getBucketIdentifier());
             if (existingBucket == null) {
                 throw new ResourceNotFoundException("Bucket does not exist for identifier: " + snapshotMetadata.getBucketIdentifier());
             }
 
             // ensure the flow exists
-            final FlowMetadata existingFlowMetadata = metadataProvider.getFlowById(snapshotMetadata.getFlowIdentifier());
-            if (existingFlowMetadata == null) {
+            final FlowEntity existingFlow = metadataService.getFlowById(snapshotMetadata.getFlowIdentifier());
+            if (existingFlow == null) {
                 throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + snapshotMetadata.getFlowIdentifier());
             }
 
-            final VersionedFlow existingFlow = DataModelMapper.map(existingFlowMetadata);
+            final VersionedFlow versionedFlow = DataModelMapper.map(existingFlow, true);
 
             // if we already have snapshots we need to verify the new one has the correct version
-            if (existingFlow.getSnapshotMetadata() != null && existingFlow.getSnapshotMetadata().size() > 0) {
-                final VersionedFlowSnapshotMetadata lastSnapshot = existingFlow.getSnapshotMetadata().last();
+            if (versionedFlow.getSnapshotMetadata() != null && versionedFlow.getSnapshotMetadata().size() > 0) {
+                final VersionedFlowSnapshotMetadata lastSnapshot = versionedFlow.getSnapshotMetadata().last();
 
                 if (snapshotMetadata.getVersion() <= lastSnapshot.getVersion()) {
                     throw new IllegalStateException("A VersionedFlowSnapshot with the same version already exists: " + snapshotMetadata.getVersion());
@@ -436,12 +501,17 @@ public class RegistryService {
             snapshotSerializer.serialize(flowSnapshot, out);
 
             // save the serialized snapshot to the persistence provider
-            final Bucket bucket = DataModelMapper.map(existingBucket);
+            final Bucket bucket = DataModelMapper.map(existingBucket, false);
             final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder(bucket, snapshotMetadata).build();
             flowPersistenceProvider.saveSnapshot(context, out.toByteArray());
 
             // create snapshot in the metadata provider
-            metadataProvider.createFlowSnapshot(DataModelMapper.map(snapshotMetadata));
+            metadataService.createFlowSnapshot(DataModelMapper.map(snapshotMetadata));
+
+            // update the modified date on the flow
+            existingFlow.setModified(new Date());
+            metadataService.updateFlow(existingFlow);
+
             return flowSnapshot;
         } finally {
             writeLock.unlock();
@@ -460,17 +530,17 @@ public class RegistryService {
         readLock.lock();
         try {
             // ensure the snapshot exists
-            final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version);
-            if (snapshotMetadata == null) {
+            final FlowSnapshotEntity snapshotEntity = metadataService.getFlowSnapshot(flowIdentifier, version);
+            if (snapshotEntity == null) {
                 throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow " + flowIdentifier + " and version " + version);
             }
 
+            final FlowEntity flow = snapshotEntity.getFlow();
+            final String flowId = flow.getId();
+            final String bucketId = flow.getBucket().getId();
+
             // get the serialized bytes of the snapshot
-            final byte[] serializedSnapshot = flowPersistenceProvider.getSnapshot(
-                    snapshotMetadata.getBucketIdentifier(),
-                    snapshotMetadata.getFlowIdentifier(),
-                    snapshotMetadata.getVersion()
-            );
+            final byte[] serializedSnapshot = flowPersistenceProvider.getSnapshot(bucketId, flowId, version);
 
             if (serializedSnapshot == null || serializedSnapshot.length == 0) {
                 throw new IllegalStateException("No serialized content found for snapshot with flow identifier "
@@ -496,24 +566,39 @@ public class RegistryService {
         writeLock.lock();
         try {
             // ensure the snapshot exists
-            final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version);
-            if (snapshotMetadata == null) {
+            final FlowSnapshotEntity snapshotEntity = metadataService.getFlowSnapshot(flowIdentifier, version);
+            if (snapshotEntity == null) {
                 throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow "
                         + flowIdentifier + " and version " + version);
             }
 
+            final FlowEntity flow = snapshotEntity.getFlow();
+            final String flowId = flow.getId();
+            final String bucketId = flow.getBucket().getId();
+
             // delete the content of the snapshot
-            flowPersistenceProvider.deleteSnapshot(
-                    snapshotMetadata.getBucketIdentifier(),
-                    snapshotMetadata.getFlowIdentifier(),
-                    snapshotMetadata.getVersion());
+            flowPersistenceProvider.deleteSnapshot(bucketId, flowId, version);
 
             // delete the snapshot itself
-            metadataProvider.deleteFlowSnapshot(flowIdentifier, version);
-            return DataModelMapper.map(snapshotMetadata);
+            metadataService.deleteFlowSnapshot(snapshotEntity);
+            return DataModelMapper.map(snapshotEntity);
         } finally {
             writeLock.unlock();
         }
     }
 
+    // ---------------------- Field methods ---------------------------------------------
+
+    public Set<String> getBucketFields() {
+        return metadataService.getBucketFields();
+    }
+
+    public Set<String> getBucketItemFields() {
+        return metadataService.getBucketItemFields();
+    }
+
+    public Set<String> getFlowFields() {
+        return metadataService.getFlowFields();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/TestService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/TestService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/TestService.java
deleted file mode 100644
index 3160fc1..0000000
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/TestService.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.registry.service;
-
-import org.springframework.stereotype.Service;
-
-/**
- * Test service to verify spring-boot will correctly inject into JAX-RS resource TestResource.
- */
-@Service
-public class TestService {
-
-    public String test() {
-        return "Testing";
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/QueryParameters.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/QueryParameters.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/QueryParameters.java
new file mode 100644
index 0000000..3c62a9d
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/QueryParameters.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service.params;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Parameters to be passed into service layer for methods that require sorting and paging.
+ */
+public class QueryParameters {
+
+    private final Integer pageNum;
+
+    private final Integer numRows;
+
+    private final List<SortParameter> sortParameters;
+
+    private QueryParameters(final Builder builder) {
+        this.pageNum = builder.pageNum;
+        this.numRows = builder.numRows;
+        this.sortParameters = Collections.unmodifiableList(new ArrayList<>(builder.sortParameters));
+
+        if (this.pageNum != null && this.numRows != null) {
+            if (this.pageNum < 0) {
+                throw new IllegalStateException("Offset cannot be negative");
+            }
+
+            if (this.numRows < 0) {
+                throw new IllegalStateException("Number of rows cannot be negative");
+            }
+        }
+    }
+
+    public Integer getPageNum() {
+        return pageNum;
+    }
+
+    public Integer getNumRows() {
+        return numRows;
+    }
+
+    public List<SortParameter> getSortParameters() {
+        return sortParameters;
+    }
+
+    /**
+     * Builder for QueryParameters.
+     */
+    public static class Builder {
+
+        private Integer pageNum;
+        private Integer numRows;
+        private List<SortParameter> sortParameters = new ArrayList<>();
+
+        public Builder pageNum(Integer pageNum) {
+            this.pageNum = pageNum;
+            return this;
+        }
+
+        public Builder numRows(Integer numRows) {
+            this.numRows = numRows;
+            return this;
+        }
+
+        public Builder addSort(final SortParameter sort) {
+            this.sortParameters.add(sort);
+            return this;
+        }
+
+        public Builder addSort(final String fieldName, final SortOrder order) {
+            this.sortParameters.add(new SortParameter(fieldName, order));
+            return this;
+        }
+
+        public Builder addSorts(final Collection<SortParameter> sorts) {
+            if (sorts != null) {
+                this.sortParameters.addAll(sorts);
+            }
+            return this;
+        }
+
+        public Builder clearSorts() {
+            this.sortParameters.clear();
+            return this;
+        }
+
+        public QueryParameters build() {
+            return new QueryParameters(this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortOrder.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortOrder.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortOrder.java
new file mode 100644
index 0000000..43a3016
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortOrder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service.params;
+
+public enum SortOrder {
+
+    ASC("asc"),
+
+    DESC("desc");
+
+    private final String name;
+
+    SortOrder(final String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public static SortOrder fromString(String order) {
+        if (ASC.getName().equals(order)) {
+            return  ASC;
+        }
+
+        if (DESC.getName().equals(order)) {
+            return DESC;
+        }
+
+        throw new IllegalArgumentException("Unknown Sort Order: " + order);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortParameter.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortParameter.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortParameter.java
new file mode 100644
index 0000000..327bd95
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/params/SortParameter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service.params;
+
+/**
+ * Sort parameter made up of a field and a sort order.
+ */
+public class SortParameter {
+
+    private final String fieldName;
+
+    private final SortOrder order;
+
+    public SortParameter(final String fieldName, final SortOrder order) {
+        this.fieldName = fieldName;
+        this.order = order;
+
+        if (this.fieldName == null) {
+            throw new IllegalStateException("Field Name cannot be null");
+        }
+
+        if (this.fieldName.trim().isEmpty()) {
+            throw new IllegalStateException("Field Name cannot be blank");
+        }
+
+        if (this.order == null) {
+            throw new IllegalStateException("Order cannot be null");
+        }
+    }
+
+    public String getFieldName() {
+        return fieldName;
+    }
+
+    public SortOrder getOrder() {
+        return order;
+    }
+
+    /**
+     * Parses a sorting expression of the form field:order.
+     *
+     * @param sortExpression the expression
+     * @return the Sort instance
+     */
+    public static SortParameter fromString(final String sortExpression) {
+        if (sortExpression == null) {
+            throw new IllegalArgumentException("Sort cannot be null");
+        }
+
+        final String[] sortParts = sortExpression.split("[:]");
+        if (sortParts.length != 2) {
+            throw new IllegalArgumentException("Sort must be in the form field:order");
+        }
+
+        final String fieldName = sortParts[0];
+        final SortOrder order = SortOrder.fromString(sortParts[1]);
+
+        return new SortParameter(fieldName, order);
+    }
+
+    @Override
+    public String toString() {
+        return fieldName + ":" + order.getName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/resources/db/migration/V1__Initial.sql
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/resources/db/migration/V1__Initial.sql b/nifi-registry-framework/src/main/resources/db/migration/V1__Initial.sql
new file mode 100644
index 0000000..f7640ed
--- /dev/null
+++ b/nifi-registry-framework/src/main/resources/db/migration/V1__Initial.sql
@@ -0,0 +1,46 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE BUCKET (
+    ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+    NAME VARCHAR2(200) NOT NULL UNIQUE,
+    DESCRIPTION VARCHAR(4096),
+    CREATED TIMESTAMP NOT NULL
+);
+
+CREATE TABLE BUCKET_ITEM (
+    ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+    NAME VARCHAR2(200) NOT NULL UNIQUE,
+    DESCRIPTION VARCHAR(4096),
+    CREATED TIMESTAMP NOT NULL,
+    MODIFIED TIMESTAMP NOT NULL,
+    ITEM_TYPE VARCHAR(50) NOT NULL,
+    BUCKET_ID VARCHAR2(50) NOT NULL,
+    FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID)
+);
+
+CREATE TABLE FLOW (
+    ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+    FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID)
+);
+
+CREATE TABLE FLOW_SNAPSHOT (
+    FLOW_ID VARCHAR2(50) NOT NULL,
+    VERSION INT NOT NULL,
+    CREATED TIMESTAMP NOT NULL,
+    COMMENTS VARCHAR(4096),
+    PRIMARY KEY (FLOW_ID, VERSION),
+    FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID)
+);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/main/xsd/providers.xsd
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/xsd/providers.xsd b/nifi-registry-framework/src/main/xsd/providers.xsd
index cb71ed8..1202f9e 100644
--- a/nifi-registry-framework/src/main/xsd/providers.xsd
+++ b/nifi-registry-framework/src/main/xsd/providers.xsd
@@ -42,7 +42,6 @@
     <xs:element name="providers">
         <xs:complexType>
             <xs:sequence>
-                <xs:element name="metadataProvider" type="Provider" minOccurs="1" maxOccurs="1"/>
                 <xs:element name="flowPersistenceProvider" type="Provider" minOccurs="1" maxOccurs="1" />
             </xs:sequence>
         </xs:complexType>

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/RepositoryTestApplication.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/RepositoryTestApplication.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/RepositoryTestApplication.java
new file mode 100644
index 0000000..de8cd79
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/RepositoryTestApplication.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.FilterType;
+
+/**
+ * Sets up the application context for database repository tests.
+ *
+ * The @SpringBootTest annotation on the repository tests will find this class by working up the package hierarchy.
+ * This class must be in the "db" package in order to find the entities in "db.entity" and repositories in "db.repository".
+ *
+ * The DataSourceFactory is excluded so that Spring Boot will load an in-memory H2 database.
+ */
+@SpringBootApplication
+@ComponentScan(
+        excludeFilters = {
+                @ComponentScan.Filter(
+                        type = FilterType.ASSIGNABLE_TYPE,
+                        value = DataSourceFactory.class)
+        })
+public class RepositoryTestApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(RepositoryTestApplication.class, args);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/RepositoryBaseTest.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/RepositoryBaseTest.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/RepositoryBaseTest.java
new file mode 100644
index 0000000..7ccd01f
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/RepositoryBaseTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.repository;
+
+import org.apache.nifi.registry.db.RepositoryTestApplication;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.TestExecutionListeners;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.test.context.support.DependencyInjectionTestExecutionListener;
+import org.springframework.test.context.transaction.TransactionalTestExecutionListener;
+import org.springframework.transaction.annotation.Transactional;
+
+@Transactional
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = RepositoryTestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
+@TestExecutionListeners({DependencyInjectionTestExecutionListener.class, TransactionalTestExecutionListener.class})
+public abstract class RepositoryBaseTest {
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/d478c20e/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/TestBucketItemRepository.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/TestBucketItemRepository.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/TestBucketItemRepository.java
new file mode 100644
index 0000000..657d599
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/db/repository/TestBucketItemRepository.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.repository;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.PageRequest;
+import org.springframework.data.domain.Sort;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestBucketItemRepository extends RepositoryBaseTest {
+
+    @Autowired
+    private BucketRepository bucketRepository;
+
+    @Autowired
+    private BucketItemRepository bucketItemRepository;
+
+    @Test
+    public void testFindAllPageable() {
+        final Page<BucketItemEntity> page = bucketItemRepository.findAll(new PageRequest(0, 10));
+        assertNotNull(page);
+        assertEquals(1, page.getTotalPages());
+        assertEquals(3, page.getTotalElements());
+
+        final List<BucketItemEntity> entities = page.getContent();
+        assertNotNull(entities);
+        assertEquals(3, entities.size());
+    }
+
+    @Test
+    public void testFindAll() {
+        final Iterable<BucketItemEntity> entities = bucketItemRepository.findAll();
+        assertNotNull(entities);
+
+        int count = 0;
+        for (BucketItemEntity entity : entities) {
+            count++;
+        }
+        assertEquals(3, count);
+    }
+
+    @Test
+    public void testFindByBucket() {
+        final BucketEntity bucket = bucketRepository.findOne("1");
+        assertNotNull(bucket);
+
+        final List<BucketItemEntity> entities = bucketItemRepository.findByBucket(bucket);
+        assertNotNull(entities);
+        assertEquals(2, entities.size());
+    }
+
+    @Test
+    public void testFindByBucketPageable() {
+        final BucketEntity bucket = bucketRepository.findOne("1");
+        assertNotNull(bucket);
+
+        final List<BucketItemEntity> entities = bucketItemRepository.findByBucket(bucket, new PageRequest(0, 2, new Sort(Sort.Direction.ASC, "id")));
+        assertNotNull(entities);
+        assertEquals(2, entities.size());
+        assertEquals("1", entities.get(0).getId());
+        assertEquals("2", entities.get(1).getId());
+    }
+
+    @Test
+    public void testFindByBucketSort() {
+        final BucketEntity bucket = bucketRepository.findOne("1");
+        assertNotNull(bucket);
+
+        final List<BucketItemEntity> entities = bucketItemRepository.findByBucket(bucket, new Sort(Sort.Direction.DESC, "id"));
+        assertNotNull(entities);
+        assertEquals(2, entities.size());
+        assertEquals("2", entities.get(0).getId());
+        assertEquals("1", entities.get(1).getId());
+    }
+
+}