You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2018/09/22 02:11:30 UTC

[31/51] [partial] nifi-registry git commit: NIFIREG-201 Refactoring project structure to better isolate extensions

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
new file mode 100644
index 0000000..3436662
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntityType;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+import org.apache.nifi.registry.db.entity.KeyEntity;
+import org.apache.nifi.registry.diff.ComponentDifference;
+import org.apache.nifi.registry.diff.ComponentDifferenceGroup;
+import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.security.key.Key;
+
+import java.util.Date;
+
+/**
+ * Utility for mapping between Provider API and the registry data model.
+ */
+public class DataModelMapper {
+
+    // --- Map buckets
+
+    public static BucketEntity map(final Bucket bucket) {
+        final BucketEntity bucketEntity = new BucketEntity();
+        bucketEntity.setId(bucket.getIdentifier());
+        bucketEntity.setName(bucket.getName());
+        bucketEntity.setDescription(bucket.getDescription());
+        bucketEntity.setCreated(new Date(bucket.getCreatedTimestamp()));
+        return bucketEntity;
+    }
+
+    public static Bucket map(final BucketEntity bucketEntity) {
+        final Bucket bucket = new Bucket();
+        bucket.setIdentifier(bucketEntity.getId());
+        bucket.setName(bucketEntity.getName());
+        bucket.setDescription(bucketEntity.getDescription());
+        bucket.setCreatedTimestamp(bucketEntity.getCreated().getTime());
+        return bucket;
+    }
+
+    // --- Map flows
+
+    public static FlowEntity map(final VersionedFlow versionedFlow) {
+        final FlowEntity flowEntity = new FlowEntity();
+        flowEntity.setId(versionedFlow.getIdentifier());
+        flowEntity.setName(versionedFlow.getName());
+        flowEntity.setDescription(versionedFlow.getDescription());
+        flowEntity.setCreated(new Date(versionedFlow.getCreatedTimestamp()));
+        flowEntity.setModified(new Date(versionedFlow.getModifiedTimestamp()));
+        flowEntity.setType(BucketItemEntityType.FLOW);
+        return flowEntity;
+    }
+
+    public static VersionedFlow map(final BucketEntity bucketEntity, final FlowEntity flowEntity) {
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setIdentifier(flowEntity.getId());
+        versionedFlow.setBucketIdentifier(flowEntity.getBucketId());
+        versionedFlow.setName(flowEntity.getName());
+        versionedFlow.setDescription(flowEntity.getDescription());
+        versionedFlow.setCreatedTimestamp(flowEntity.getCreated().getTime());
+        versionedFlow.setModifiedTimestamp(flowEntity.getModified().getTime());
+        versionedFlow.setVersionCount(flowEntity.getSnapshotCount());
+
+        if (bucketEntity != null) {
+            versionedFlow.setBucketName(bucketEntity.getName());
+        } else {
+            versionedFlow.setBucketName(flowEntity.getBucketName());
+        }
+
+        return versionedFlow;
+    }
+
+    // --- Map snapshots
+
+    public static FlowSnapshotEntity map(final VersionedFlowSnapshotMetadata versionedFlowSnapshot) {
+        final FlowSnapshotEntity flowSnapshotEntity = new FlowSnapshotEntity();
+        flowSnapshotEntity.setFlowId(versionedFlowSnapshot.getFlowIdentifier());
+        flowSnapshotEntity.setVersion(versionedFlowSnapshot.getVersion());
+        flowSnapshotEntity.setComments(versionedFlowSnapshot.getComments());
+        flowSnapshotEntity.setCreated(new Date(versionedFlowSnapshot.getTimestamp()));
+        flowSnapshotEntity.setCreatedBy(versionedFlowSnapshot.getAuthor());
+        return flowSnapshotEntity;
+    }
+
+    public static VersionedFlowSnapshotMetadata map(final BucketEntity bucketEntity, final FlowSnapshotEntity flowSnapshotEntity) {
+        final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
+        metadata.setFlowIdentifier(flowSnapshotEntity.getFlowId());
+        metadata.setVersion(flowSnapshotEntity.getVersion());
+        metadata.setComments(flowSnapshotEntity.getComments());
+        metadata.setTimestamp(flowSnapshotEntity.getCreated().getTime());
+        metadata.setAuthor(flowSnapshotEntity.getCreatedBy());
+
+        if (bucketEntity != null) {
+            metadata.setBucketIdentifier(bucketEntity.getId());
+        }
+
+        return metadata;
+    }
+
+    public static ComponentDifference map(final FlowDifference flowDifference){
+        ComponentDifference diff = new ComponentDifference();
+        diff.setChangeDescription(flowDifference.getDescription());
+        diff.setDifferenceType(flowDifference.getDifferenceType().toString());
+        diff.setDifferenceTypeDescription(flowDifference.getDifferenceType().getDescription());
+        diff.setValueA(getValueDescription(flowDifference.getValueA()));
+        diff.setValueB(getValueDescription(flowDifference.getValueB()));
+        return diff;
+    }
+
+    public static ComponentDifferenceGroup map(VersionedComponent versionedComponent){
+        ComponentDifferenceGroup grouping = new ComponentDifferenceGroup();
+        grouping.setComponentId(versionedComponent.getIdentifier());
+        grouping.setComponentName(versionedComponent.getName());
+        grouping.setProcessGroupId(versionedComponent.getGroupIdentifier());
+        grouping.setComponentType(versionedComponent.getComponentType().getTypeName());
+        return grouping;
+    }
+
+    private static String getValueDescription(Object valueA){
+        if(valueA instanceof VersionedComponent){
+            return ((VersionedComponent) valueA).getIdentifier();
+        }
+        if(valueA!= null){
+            return valueA.toString();
+        }
+        return null;
+    }
+
+    // --- Map keys
+
+    public static Key map(final KeyEntity keyEntity) {
+        final Key key = new Key();
+        key.setId(keyEntity.getId());
+        key.setIdentity(keyEntity.getTenantIdentity());
+        key.setKey(keyEntity.getKeyValue());
+        return key;
+    }
+
+    public static KeyEntity map(final Key key) {
+        final KeyEntity keyEntity = new KeyEntity();
+        keyEntity.setId(key.getId());
+        keyEntity.setTenantIdentity(key.getIdentity());
+        keyEntity.setKeyValue(key.getKey());
+        return keyEntity;
+    }
+
+    // map
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
new file mode 100644
index 0000000..ea0b214
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/MetadataService.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A service for managing metadata about all objects stored by the registry.
+ *
+ */
+public interface MetadataService {
+
+    /**
+     * Creates the given bucket.
+     *
+     * @param bucket the bucket to create
+     * @return the created bucket
+     */
+    BucketEntity createBucket(BucketEntity bucket);
+
+    /**
+     * Retrieves the bucket with the given id.
+     *
+     * @param bucketIdentifier the id of the bucket to retrieve
+     * @return the bucket with the given id, or null if it does not exist
+     */
+    BucketEntity getBucketById(String bucketIdentifier);
+
+    /**
+     * Retrieves the buckets with the given name. The name comparison must be case-insensitive.
+     *
+     * @param name the name of the bucket to retrieve
+     * @return the buckets with the given name, or empty list if none exist
+     */
+    List<BucketEntity> getBucketsByName(String name);
+
+    /**
+     * Updates the given bucket, only the name and description should be allowed to be updated.
+     *
+     * @param bucket the updated bucket to save
+     * @return the updated bucket, or null if no bucket with the given id exists
+     */
+    BucketEntity updateBucket(BucketEntity bucket);
+
+    /**
+     * Deletes the bucket, as well as any objects that reference the bucket.
+     *
+     * @param bucket the bucket to delete
+     */
+    void deleteBucket(BucketEntity bucket);
+
+    /**
+     * Retrieves all buckets with the given ids.
+     *
+     * @param bucketIds the ids of the buckets to retrieve
+     * @return the set of all buckets
+     */
+    List<BucketEntity> getBuckets(Set<String> bucketIds);
+
+    /**
+     * Retrieves all buckets.
+     *
+     * @return the set of all buckets
+     */
+    List<BucketEntity> getAllBuckets();
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Retrieves items for the given bucket.
+     *
+     * @param bucketId the id of bucket to retrieve items for
+     * @return the set of items for the bucket
+     */
+    List<BucketItemEntity> getBucketItems(String bucketId);
+
+    /**
+     * Retrieves items for the given buckets.
+     *
+     * @param bucketIds the ids of buckets to retrieve items for
+     * @return the set of items for the bucket
+     */
+    List<BucketItemEntity> getBucketItems(Set<String> bucketIds);
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Creates a versioned flow in the given bucket.
+     *
+     * @param flow the versioned flow to create
+     * @return the created versioned flow
+     * @throws IllegalStateException if no bucket with the given identifier exists
+     */
+    FlowEntity createFlow(FlowEntity flow);
+
+    /**
+     * Retrieves the versioned flow with the given id and DOES NOT populate the versionCount.
+     *
+     * @param flowIdentifier the identifier of the flow to retrieve
+     * @return the versioned flow with the given id, or null if no flow with the given id exists
+     */
+    FlowEntity getFlowById(String flowIdentifier);
+
+    /**
+     * Retrieves the versioned flow with the given id and DOES populate the versionCount.
+     *
+     * @param flowIdentifier the identifier of the flow to retrieve
+     * @return the versioned flow with the given id, or null if no flow with the given id exists
+     */
+    FlowEntity getFlowByIdWithSnapshotCounts(String flowIdentifier);
+
+    /**
+     * Retrieves the versioned flows with the given name. The name comparison must be case-insensitive.
+     *
+     * @param name the name of the flow to retrieve
+     * @return the versioned flows with the given name, or empty list if no flows with the given name exists
+     */
+    List<FlowEntity> getFlowsByName(String name);
+
+    /**
+     * Retrieves the versioned flows with the given name in the given bucket. The name comparison must be case-insensitive.
+     *
+     * @param  bucketIdentifier the identifier of the bucket
+     * @param name the name of the flow to retrieve
+     * @return the versioned flows with the given name in the given bucket, or empty list if no flows with the given name exists
+     */
+    List<FlowEntity> getFlowsByName(String bucketIdentifier, String name);
+
+    /**
+     * Retrieves the versioned flows for the given bucket.
+     *
+     * @param bucketIdentifier the bucket id to retrieve flows for
+     * @return the flows in the given bucket
+     */
+    List<FlowEntity> getFlowsByBucket(String bucketIdentifier);
+
+    /**
+     * Updates the given versioned flow, only the name and description should be allowed to be updated.
+     *
+     * @param flow the updated versioned flow to save
+     * @return the updated versioned flow
+     */
+    FlowEntity updateFlow(FlowEntity flow);
+
+    /**
+     * Deletes the flow if one exists.
+     *
+     * @param flow the flow to delete
+     */
+    void deleteFlow(FlowEntity flow);
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Creates a versioned flow snapshot.
+     *
+     * @param flowSnapshot the snapshot to create
+     * @return the created snapshot
+     * @throws IllegalStateException if the versioned flow specified by flowSnapshot.getFlowIdentifier() does not exist
+     */
+    FlowSnapshotEntity createFlowSnapshot(FlowSnapshotEntity flowSnapshot);
+
+    /**
+     * Retrieves the snapshot for the given flow identifier and snapshot version.
+     *
+     * @param flowIdentifier the identifier of the flow the snapshot belongs to
+     * @param version the version of the snapshot
+     * @return the versioned flow snapshot for the given flow identifier and version, or null if none exists
+     */
+    FlowSnapshotEntity getFlowSnapshot(String flowIdentifier, Integer version);
+
+    /**
+     * Retrieves the snapshot with the latest version number for the given flow in the given bucket.
+     *
+     * @param flowIdentifier the id of flow to retrieve the latest snapshot for
+     * @return the latest snapshot for the flow, or null if one doesn't exist
+     */
+    FlowSnapshotEntity getLatestSnapshot(String flowIdentifier);
+
+    /**
+     * Retrieves the snapshots for the given flow in the given bucket.
+     *
+     * @param flowIdentifier the id of the flow
+     * @return the snapshots
+     */
+    List<FlowSnapshotEntity> getSnapshots(String flowIdentifier);
+
+    /**
+     * Deletes the flow snapshot.
+     *
+     * @param flowSnapshot the flow snapshot to delete
+     */
+    void deleteFlowSnapshot(FlowSnapshotEntity flowSnapshot);
+
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * @return the set of field names for Buckets
+     */
+    Set<String> getBucketFields();
+
+    /**
+     * @return the set of field names for BucketItems
+     */
+    Set<String> getBucketItemFields();
+
+    /**
+     * @return the set of field names for Flows
+     */
+    Set<String> getFlowFields();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/QueryParameters.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/QueryParameters.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/QueryParameters.java
new file mode 100644
index 0000000..99ef9dd
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/QueryParameters.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.params.SortOrder;
+import org.apache.nifi.registry.params.SortParameter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Parameters to be passed into service layer for methods that require sorting and paging.
+ */
+public class QueryParameters {
+
+    public static final QueryParameters EMPTY_PARAMETERS = new QueryParameters.Builder().build();
+
+    private final Integer pageNum;
+
+    private final Integer numRows;
+
+    private final List<SortParameter> sortParameters;
+
+    private QueryParameters(final Builder builder) {
+        this.pageNum = builder.pageNum;
+        this.numRows = builder.numRows;
+        this.sortParameters = Collections.unmodifiableList(new ArrayList<>(builder.sortParameters));
+
+        if (this.pageNum != null && this.numRows != null) {
+            if (this.pageNum < 0) {
+                throw new IllegalStateException("Offset cannot be negative");
+            }
+
+            if (this.numRows < 0) {
+                throw new IllegalStateException("Number of rows cannot be negative");
+            }
+        }
+    }
+
+    public Integer getPageNum() {
+        return pageNum;
+    }
+
+    public Integer getNumRows() {
+        return numRows;
+    }
+
+    public List<SortParameter> getSortParameters() {
+        return sortParameters;
+    }
+
+    /**
+     * Builder for QueryParameters.
+     */
+    public static class Builder {
+
+        private Integer pageNum;
+        private Integer numRows;
+        private List<SortParameter> sortParameters = new ArrayList<>();
+
+        public Builder pageNum(Integer pageNum) {
+            this.pageNum = pageNum;
+            return this;
+        }
+
+        public Builder numRows(Integer numRows) {
+            this.numRows = numRows;
+            return this;
+        }
+
+        public Builder addSort(final SortParameter sort) {
+            this.sortParameters.add(sort);
+            return this;
+        }
+
+        public Builder addSort(final String fieldName, final SortOrder order) {
+            this.sortParameters.add(new SortParameter(fieldName, order));
+            return this;
+        }
+
+        public Builder addSorts(final Collection<SortParameter> sorts) {
+            if (sorts != null) {
+                this.sortParameters.addAll(sorts);
+            }
+            return this;
+        }
+
+        public Builder clearSorts() {
+            this.sortParameters.clear();
+            return this;
+        }
+
+        public QueryParameters build() {
+            return new QueryParameters(this);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
new file mode 100644
index 0000000..23f1d14
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
@@ -0,0 +1,994 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.bucket.BucketItem;
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntity;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+import org.apache.nifi.registry.diff.ComponentDifferenceGroup;
+import org.apache.nifi.registry.diff.VersionedFlowDifference;
+import org.apache.nifi.registry.exception.ResourceNotFoundException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.VersionedComponent;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.flow.diff.ComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.ConciseEvolvingDifferenceDescriptor;
+import org.apache.nifi.registry.flow.diff.FlowComparator;
+import org.apache.nifi.registry.flow.diff.FlowComparison;
+import org.apache.nifi.registry.flow.diff.FlowDifference;
+import org.apache.nifi.registry.flow.diff.StandardComparableDataFlow;
+import org.apache.nifi.registry.flow.diff.StandardFlowComparator;
+import org.apache.nifi.registry.provider.flow.StandardFlowSnapshotContext;
+import org.apache.nifi.registry.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import javax.validation.ConstraintViolation;
+import javax.validation.ConstraintViolationException;
+import javax.validation.Validator;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * Main service for all back-end operations, REST resources should only interact with this service.
+ *
+ * This service is marked as @Transactional so that Spring will automatically start a transaction upon entering
+ * any method, and will rollback the transaction if any Exception is thrown out of a method.
+ *
+ */
+@Service
+@Transactional(rollbackFor = Exception.class)
+public class RegistryService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RegistryService.class);
+
+    private final MetadataService metadataService;
+    private final FlowPersistenceProvider flowPersistenceProvider;
+    private final Serializer<VersionedProcessGroup> processGroupSerializer;
+    private final Validator validator;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Lock readLock = lock.readLock();
+    private final Lock writeLock = lock.writeLock();
+
+    @Autowired
+    public RegistryService(final MetadataService metadataService,
+                           final FlowPersistenceProvider flowPersistenceProvider,
+                           final Serializer<VersionedProcessGroup> processGroupSerializer,
+                           final Validator validator) {
+        this.metadataService = metadataService;
+        this.flowPersistenceProvider = flowPersistenceProvider;
+        this.processGroupSerializer = processGroupSerializer;
+        this.validator = validator;
+        Validate.notNull(this.metadataService);
+        Validate.notNull(this.flowPersistenceProvider);
+        Validate.notNull(this.processGroupSerializer);
+        Validate.notNull(this.validator);
+    }
+
+    private <T>  void validate(T t, String invalidMessage) {
+        final Set<ConstraintViolation<T>> violations = validator.validate(t);
+        if (violations.size() > 0) {
+            throw new ConstraintViolationException(invalidMessage, violations);
+        }
+    }
+
+    // ---------------------- Bucket methods ---------------------------------------------
+
+    public Bucket createBucket(final Bucket bucket) {
+        if (bucket == null) {
+            throw new IllegalArgumentException("Bucket cannot be null");
+        }
+
+        // set an id, the created time, and clear out the flows since its read-only
+        bucket.setIdentifier(UUID.randomUUID().toString());
+        bucket.setCreatedTimestamp(System.currentTimeMillis());
+
+        validate(bucket, "Cannot create Bucket");
+
+        writeLock.lock();
+        try {
+            final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName());
+            if (bucketsWithSameName.size() > 0) {
+                throw new IllegalStateException("A bucket with the same name already exists");
+            }
+
+            final BucketEntity createdBucket = metadataService.createBucket(DataModelMapper.map(bucket));
+            return DataModelMapper.map(createdBucket);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public Bucket getBucket(final String bucketIdentifier) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null");
+        }
+
+        readLock.lock();
+        try {
+            final BucketEntity bucket = metadataService.getBucketById(bucketIdentifier);
+            if (bucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            return DataModelMapper.map(bucket);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<Bucket> getBuckets() {
+        readLock.lock();
+        try {
+            final List<BucketEntity> buckets = metadataService.getAllBuckets();
+            return buckets.stream().map(b -> DataModelMapper.map(b)).collect(Collectors.toList());
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<Bucket> getBuckets(final Set<String> bucketIds) {
+        readLock.lock();
+        try {
+            final List<BucketEntity> buckets = metadataService.getBuckets(bucketIds);
+            return buckets.stream().map(b -> DataModelMapper.map(b)).collect(Collectors.toList());
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public Bucket updateBucket(final Bucket bucket) {
+        if (bucket == null) {
+            throw new IllegalArgumentException("Bucket cannot be null");
+        }
+
+        if (bucket.getIdentifier() == null) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null");
+        }
+
+        if (bucket.getName() != null && StringUtils.isBlank(bucket.getName())) {
+            throw new IllegalArgumentException("Bucket name cannot be blank");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure a bucket with the given id exists
+            final BucketEntity existingBucketById = metadataService.getBucketById(bucket.getIdentifier());
+            if (existingBucketById == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucket.getIdentifier());
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // ensure a different bucket with the same name does not exist
+            // since we're allowing partial updates here, only check this if a non-null name is provided
+            if (StringUtils.isNotBlank(bucket.getName())) {
+                final List<BucketEntity> bucketsWithSameName = metadataService.getBucketsByName(bucket.getName());
+                if (bucketsWithSameName != null) {
+                    for (final BucketEntity bucketWithSameName : bucketsWithSameName) {
+                        if (!bucketWithSameName.getId().equals(existingBucketById.getId())){
+                            throw new IllegalStateException("A bucket with the same name already exists - " + bucket.getName());
+                        }
+                    }
+                }
+            }
+
+            // transfer over the new values to the existing bucket
+            if (StringUtils.isNotBlank(bucket.getName())) {
+                existingBucketById.setName(bucket.getName());
+            }
+
+            if (bucket.getDescription() != null) {
+                existingBucketById.setDescription(bucket.getDescription());
+            }
+
+            // perform the actual update
+            final BucketEntity updatedBucket = metadataService.updateBucket(existingBucketById);
+            return DataModelMapper.map(updatedBucket);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public Bucket deleteBucket(final String bucketIdentifier) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // for each flow in the bucket, delete all snapshots from the flow persistence provider
+            for (final FlowEntity flowEntity : metadataService.getFlowsByBucket(existingBucket.getId())) {
+                flowPersistenceProvider.deleteAllFlowContent(bucketIdentifier, flowEntity.getId());
+            }
+
+            // now delete the bucket from the metadata provider, which deletes all flows referencing it
+            metadataService.deleteBucket(existingBucket);
+
+            return DataModelMapper.map(existingBucket);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    // ---------------------- BucketItem methods ---------------------------------------------
+
+    public List<BucketItem> getBucketItems(final String bucketIdentifier) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null");
+        }
+
+        readLock.lock();
+        try {
+            final BucketEntity bucket = metadataService.getBucketById(bucketIdentifier);
+            if (bucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            final List<BucketItem> bucketItems = new ArrayList<>();
+            metadataService.getBucketItems(bucket.getId()).stream().forEach(b -> addBucketItem(bucketItems, b));
+            return bucketItems;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<BucketItem> getBucketItems(final Set<String> bucketIdentifiers) {
+        if (bucketIdentifiers == null || bucketIdentifiers.isEmpty()) {
+            throw new IllegalArgumentException("Bucket identifiers cannot be null or empty");
+        }
+
+        readLock.lock();
+        try {
+            final List<BucketItem> bucketItems = new ArrayList<>();
+            metadataService.getBucketItems(bucketIdentifiers).stream().forEach(b -> addBucketItem(bucketItems, b));
+            return bucketItems;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    private void addBucketItem(final List<BucketItem> bucketItems, final BucketItemEntity itemEntity) {
+        if (itemEntity instanceof FlowEntity) {
+            final FlowEntity flowEntity = (FlowEntity) itemEntity;
+
+            // Currently we don't populate the bucket name for items
+            bucketItems.add(DataModelMapper.map(null, flowEntity));
+        } else {
+            LOGGER.error("Unknown type of BucketItemEntity: " + itemEntity.getClass().getCanonicalName());
+        }
+    }
+
+    // ---------------------- VersionedFlow methods ---------------------------------------------
+
+    public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null or blank");
+        }
+
+        if (versionedFlow == null) {
+            throw new IllegalArgumentException("Versioned flow cannot be null");
+        }
+
+        if (versionedFlow.getBucketIdentifier() != null && !bucketIdentifier.equals(versionedFlow.getBucketIdentifier())) {
+            throw new IllegalArgumentException("Bucket identifiers must match");
+        }
+
+        if (versionedFlow.getBucketIdentifier() == null) {
+            versionedFlow.setBucketIdentifier(bucketIdentifier);
+        }
+
+        versionedFlow.setIdentifier(UUID.randomUUID().toString());
+
+        final long timestamp = System.currentTimeMillis();
+        versionedFlow.setCreatedTimestamp(timestamp);
+        versionedFlow.setModifiedTimestamp(timestamp);
+
+        validate(versionedFlow, "Cannot create versioned flow");
+
+        writeLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // ensure another flow with the same name doesn't exist
+            final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(existingBucket.getId(), versionedFlow.getName());
+            if (flowsWithSameName != null && flowsWithSameName.size() > 0) {
+                throw new IllegalStateException("A versioned flow with the same name already exists in the selected bucket");
+            }
+
+            // convert from dto to entity and set the bucket relationship
+            final FlowEntity flowEntity = DataModelMapper.map(versionedFlow);
+            flowEntity.setBucketId(existingBucket.getId());
+
+            // persist the flow and return the created entity
+            final FlowEntity createdFlow = metadataService.createFlow(flowEntity);
+            return DataModelMapper.map(existingBucket, createdFlow);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public VersionedFlow getFlow(final String bucketIdentifier, final String flowIdentifier) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null or blank");
+        }
+
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Versioned flow identifier cannot be null or blank");
+        }
+
+        readLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            final FlowEntity existingFlow = metadataService.getFlowByIdWithSnapshotCounts(flowIdentifier);
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier);
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            if (!existingBucket.getId().equals(existingFlow.getBucketId())) {
+                throw new IllegalStateException("The requested flow is not located in the given bucket");
+            }
+
+            return DataModelMapper.map(existingBucket, existingFlow);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public VersionedFlow getFlow(final String flowIdentifier) {
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Versioned flow identifier cannot be null or blank");
+        }
+
+        readLock.lock();
+        try {
+            final FlowEntity existingFlow = metadataService.getFlowByIdWithSnapshotCounts(flowIdentifier);
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier);
+                throw new ResourceNotFoundException("The specified flow ID does not exist.");
+            }
+
+            final BucketEntity existingBucket = metadataService.getBucketById(existingFlow.getBucketId());
+            return DataModelMapper.map(existingBucket, existingFlow);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<VersionedFlow> getFlows(final String bucketId) {
+        if (StringUtils.isBlank(bucketId)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null");
+        }
+
+        readLock.lock();
+        try {
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketId);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketId);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // return non-verbose set of flows for the given bucket
+            final List<FlowEntity> flows = metadataService.getFlowsByBucket(existingBucket.getId());
+            return flows.stream().map(f -> DataModelMapper.map(existingBucket, f)).collect(Collectors.toList());
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public VersionedFlow updateFlow(final VersionedFlow versionedFlow) {
+        if (versionedFlow == null) {
+            throw new IllegalArgumentException("Versioned flow cannot be null");
+        }
+
+        if (StringUtils.isBlank(versionedFlow.getIdentifier())) {
+            throw new IllegalArgumentException("Versioned flow identifier cannot be null or blank");
+        }
+
+        if (StringUtils.isBlank(versionedFlow.getBucketIdentifier())) {
+            throw new IllegalArgumentException("Versioned flow bucket identifier cannot be null or blank");
+        }
+
+        if (versionedFlow.getName() != null && StringUtils.isBlank(versionedFlow.getName())) {
+            throw new IllegalArgumentException("Versioned flow name cannot be blank");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(versionedFlow.getBucketIdentifier());
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", versionedFlow.getBucketIdentifier());
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            final FlowEntity existingFlow = metadataService.getFlowByIdWithSnapshotCounts(versionedFlow.getIdentifier());
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", versionedFlow.getIdentifier());
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            if (!existingBucket.getId().equals(existingFlow.getBucketId())) {
+                throw new IllegalStateException("The requested flow is not located in the given bucket");
+            }
+
+            // ensure a different flow with the same name does not exist
+            // since we're allowing partial updates here, only check this if a non-null name is provided
+            if (StringUtils.isNotBlank(versionedFlow.getName())) {
+                final List<FlowEntity> flowsWithSameName = metadataService.getFlowsByName(existingBucket.getId(), versionedFlow.getName());
+                if (flowsWithSameName != null) {
+                    for (final FlowEntity flowWithSameName : flowsWithSameName) {
+                         if(!flowWithSameName.getId().equals(existingFlow.getId())) {
+                            throw new IllegalStateException("A versioned flow with the same name already exists in the selected bucket");
+                        }
+                    }
+                }
+            }
+
+            // transfer over the new values to the existing flow
+            if (StringUtils.isNotBlank(versionedFlow.getName())) {
+                existingFlow.setName(versionedFlow.getName());
+            }
+
+            if (versionedFlow.getDescription() != null) {
+                existingFlow.setDescription(versionedFlow.getDescription());
+            }
+
+            // perform the actual update
+            final FlowEntity updatedFlow = metadataService.updateFlow(existingFlow);
+            return DataModelMapper.map(existingBucket, updatedFlow);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public VersionedFlow deleteFlow(final String bucketIdentifier, final String flowIdentifier) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null or blank");
+        }
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow identifier cannot be null or blank");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // ensure the flow exists
+            final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier);
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier);
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            if (!existingBucket.getId().equals(existingFlow.getBucketId())) {
+                throw new IllegalStateException("The requested flow is not located in the given bucket");
+            }
+
+            // delete all snapshots from the flow persistence provider
+            flowPersistenceProvider.deleteAllFlowContent(existingFlow.getBucketId(), existingFlow.getId());
+
+            // now delete the flow from the metadata provider
+            metadataService.deleteFlow(existingFlow);
+
+            return DataModelMapper.map(existingBucket, existingFlow);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    // ---------------------- VersionedFlowSnapshot methods ---------------------------------------------
+
+    public VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
+        if (flowSnapshot == null) {
+            throw new IllegalArgumentException("Versioned flow snapshot cannot be null");
+        }
+
+        // validation will ensure that the metadata and contents are not null
+        if (flowSnapshot.getSnapshotMetadata() != null) {
+            flowSnapshot.getSnapshotMetadata().setTimestamp(System.currentTimeMillis());
+        }
+
+        // these fields aren't used for creation
+        flowSnapshot.setFlow(null);
+        flowSnapshot.setBucket(null);
+
+        validate(flowSnapshot, "Cannot create versioned flow snapshot");
+
+        writeLock.lock();
+        try {
+            final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
+
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(snapshotMetadata.getBucketIdentifier());
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", snapshotMetadata.getBucketIdentifier());
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // ensure the flow exists
+            final FlowEntity existingFlow = metadataService.getFlowById(snapshotMetadata.getFlowIdentifier());
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", snapshotMetadata.getFlowIdentifier());
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            if (!existingBucket.getId().equals(existingFlow.getBucketId())) {
+                throw new IllegalStateException("The requested flow is not located in the given bucket");
+            }
+
+            // convert the set of FlowSnapshotEntity to set of VersionedFlowSnapshotMetadata
+            final SortedSet<VersionedFlowSnapshotMetadata> sortedSnapshots = new TreeSet<>();
+            final List<FlowSnapshotEntity> existingFlowSnapshots = metadataService.getSnapshots(existingFlow.getId());
+            if (existingFlowSnapshots != null) {
+                existingFlowSnapshots.stream().forEach(s -> sortedSnapshots.add(DataModelMapper.map(existingBucket, s)));
+            }
+
+            // if we already have snapshots we need to verify the new one has the correct version
+            if (sortedSnapshots != null && sortedSnapshots.size() > 0) {
+                final VersionedFlowSnapshotMetadata lastSnapshot = sortedSnapshots.last();
+
+                if (snapshotMetadata.getVersion() <= lastSnapshot.getVersion()) {
+                    throw new IllegalStateException("A Versioned flow snapshot with the same version already exists: " + snapshotMetadata.getVersion());
+                }
+
+                if (snapshotMetadata.getVersion() > (lastSnapshot.getVersion() + 1)) {
+                    throw new IllegalStateException("Version must be a one-up number, last version was "
+                            + lastSnapshot.getVersion() + " and version for this snapshot was "
+                            + snapshotMetadata.getVersion());
+                }
+            } else if (snapshotMetadata.getVersion() != 1) {
+                throw new IllegalStateException("Version of first snapshot must be 1");
+            }
+
+            // serialize the snapshot
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+            processGroupSerializer.serialize(flowSnapshot.getFlowContents(), out);
+
+            // save the serialized snapshot to the persistence provider
+            final Bucket bucket = DataModelMapper.map(existingBucket);
+            final VersionedFlow versionedFlow = DataModelMapper.map(existingBucket, existingFlow);
+            final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder(bucket, versionedFlow, snapshotMetadata).build();
+            flowPersistenceProvider.saveFlowContent(context, out.toByteArray());
+
+            // create snapshot in the metadata provider
+            metadataService.createFlowSnapshot(DataModelMapper.map(snapshotMetadata));
+
+            // update the modified date on the flow
+            metadataService.updateFlow(existingFlow);
+
+            // get the updated flow, we need to use "with counts" here so we can return this is a part of the response
+            final FlowEntity updatedFlow = metadataService.getFlowByIdWithSnapshotCounts(snapshotMetadata.getFlowIdentifier());
+            if (updatedFlow == null) {
+                throw new ResourceNotFoundException("Versioned flow does not exist for identifier " + snapshotMetadata.getFlowIdentifier());
+            }
+            final VersionedFlow updatedVersionedFlow = DataModelMapper.map(existingBucket, updatedFlow);
+
+            flowSnapshot.setBucket(bucket);
+            flowSnapshot.setFlow(updatedVersionedFlow);
+            return flowSnapshot;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public VersionedFlowSnapshot getFlowSnapshot(final String bucketIdentifier, final String flowIdentifier, final Integer version) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null or blank");
+        }
+
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow identifier cannot be null or blank");
+        }
+
+        if (version == null) {
+            throw new IllegalArgumentException("Version cannot be null or blank");
+        }
+
+        readLock.lock();
+        try {
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // we need to populate the version count here so we have to do this retrieval instead of snapshotEntity.getFlow()
+            final FlowEntity flowEntityWithCount = metadataService.getFlowByIdWithSnapshotCounts(flowIdentifier);
+            if (flowEntityWithCount == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier);
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            if (!existingBucket.getId().equals(flowEntityWithCount.getBucketId())) {
+                throw new IllegalStateException("The requested flow is not located in the given bucket");
+            }
+
+            return getVersionedFlowSnapshot(existingBucket, flowEntityWithCount, version);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    private VersionedFlowSnapshot getVersionedFlowSnapshot(final BucketEntity bucketEntity, final FlowEntity flowEntity, final Integer version) {
+        // ensure the snapshot exists
+        final FlowSnapshotEntity snapshotEntity = metadataService.getFlowSnapshot(flowEntity.getId(), version);
+        if (snapshotEntity == null) {
+            LOGGER.warn("The specified flow snapshot id [{}] does not exist for version [{}].", flowEntity.getId(), version);
+            throw new ResourceNotFoundException("The specified versioned flow snapshot does not exist for this flow.");
+        }
+
+        // get the serialized bytes of the snapshot
+        final byte[] serializedSnapshot = flowPersistenceProvider.getFlowContent(bucketEntity.getId(), flowEntity.getId(), version);
+
+        if (serializedSnapshot == null || serializedSnapshot.length == 0) {
+            throw new IllegalStateException("No serialized content found for snapshot with flow identifier "
+                    + flowEntity.getId() + " and version " + version);
+        }
+
+        // deserialize the contents
+        final InputStream input = new ByteArrayInputStream(serializedSnapshot);
+        final VersionedProcessGroup flowContents = processGroupSerializer.deserialize(input);
+
+        // map entities to data model
+        final Bucket bucket = DataModelMapper.map(bucketEntity);
+        final VersionedFlow versionedFlow = DataModelMapper.map(bucketEntity, flowEntity);
+        final VersionedFlowSnapshotMetadata snapshotMetadata = DataModelMapper.map(bucketEntity, snapshotEntity);
+
+        // create the snapshot to return
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setFlowContents(flowContents);
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+        snapshot.setFlow(versionedFlow);
+        snapshot.setBucket(bucket);
+        return snapshot;
+    }
+
+    /**
+     * Returns all versions of a flow, sorted newest to oldest.
+     *
+     * @param bucketIdentifier the id of the bucket to search for the flowIdentifier
+     * @param flowIdentifier the id of the flow to retrieve from the specified bucket
+     * @return all versions of the specified flow, sorted newest to oldest
+     */
+    public SortedSet<VersionedFlowSnapshotMetadata> getFlowSnapshots(final String bucketIdentifier, final String flowIdentifier) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null or blank");
+        }
+
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow identifier cannot be null or blank");
+        }
+
+        readLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // ensure the flow exists
+            final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier);
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier);
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            if (!existingBucket.getId().equals(existingFlow.getBucketId())) {
+                throw new IllegalStateException("The requested flow is not located in the given bucket");
+            }
+
+            // convert the set of FlowSnapshotEntity to set of VersionedFlowSnapshotMetadata, ordered by version descending
+            final SortedSet<VersionedFlowSnapshotMetadata> sortedSnapshots = new TreeSet<>(Collections.reverseOrder());
+            final List<FlowSnapshotEntity> existingFlowSnapshots = metadataService.getSnapshots(existingFlow.getId());
+            if (existingFlowSnapshots != null) {
+                existingFlowSnapshots.stream().forEach(s -> sortedSnapshots.add(DataModelMapper.map(existingBucket, s)));
+            }
+
+            return sortedSnapshots;
+
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public VersionedFlowSnapshotMetadata getLatestFlowSnapshotMetadata(final String bucketIdentifier, final String flowIdentifier) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null or blank");
+        }
+
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow identifier cannot be null or blank");
+        }
+
+        readLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // ensure the flow exists
+            final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier);
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier);
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            if (!existingBucket.getId().equals(existingFlow.getBucketId())) {
+                throw new IllegalStateException("The requested flow is not located in the given bucket");
+            }
+
+            // get latest snapshot for the flow
+            final FlowSnapshotEntity latestSnapshot = metadataService.getLatestSnapshot(existingFlow.getId());
+            if (latestSnapshot == null) {
+                throw new ResourceNotFoundException("The specified flow ID has no versions");
+            }
+
+            return DataModelMapper.map(existingBucket, latestSnapshot);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public VersionedFlowSnapshotMetadata getLatestFlowSnapshotMetadata(final String flowIdentifier) {
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow identifier cannot be null or blank");
+        }
+
+        readLock.lock();
+        try {
+            // ensure the flow exists
+            final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier);
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier);
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(existingFlow.getBucketId());
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", existingFlow.getBucketId());
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // get latest snapshot for the flow
+            final FlowSnapshotEntity latestSnapshot = metadataService.getLatestSnapshot(existingFlow.getId());
+            if (latestSnapshot == null) {
+                throw new ResourceNotFoundException("The specified flow ID has no versions");
+            }
+
+            return DataModelMapper.map(existingBucket, latestSnapshot);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public VersionedFlowSnapshotMetadata deleteFlowSnapshot(final String bucketIdentifier, final String flowIdentifier, final Integer version) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null or blank");
+        }
+
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow identifier cannot be null or blank");
+        }
+
+        if (version == null) {
+            throw new IllegalArgumentException("Version cannot be null or blank");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketEntity existingBucket = metadataService.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                LOGGER.warn("The specified bucket id [{}] does not exist.", bucketIdentifier);
+                throw new ResourceNotFoundException("The specified bucket ID does not exist in this registry.");
+            }
+
+            // ensure the flow exists
+            final FlowEntity existingFlow = metadataService.getFlowById(flowIdentifier);
+            if (existingFlow == null) {
+                LOGGER.warn("The specified flow id [{}] does not exist.", flowIdentifier);
+                throw new ResourceNotFoundException("The specified flow ID does not exist in this bucket.");
+            }
+
+            if (!existingBucket.getId().equals(existingFlow.getBucketId())) {
+                throw new IllegalStateException("The requested flow is not located in the given bucket");
+            }
+
+            // ensure the snapshot exists
+            final FlowSnapshotEntity snapshotEntity = metadataService.getFlowSnapshot(flowIdentifier, version);
+            if (snapshotEntity == null) {
+                throw new ResourceNotFoundException("Versioned flow snapshot does not exist for flow "
+                        + flowIdentifier + " and version " + version);
+            }
+
+            // delete the content of the snapshot
+            flowPersistenceProvider.deleteFlowContent(bucketIdentifier, flowIdentifier, version);
+
+            // delete the snapshot itself
+            metadataService.deleteFlowSnapshot(snapshotEntity);
+            return DataModelMapper.map(existingBucket, snapshotEntity);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    /**
+     * Returns the differences between two specified versions of a flow.
+     *
+     * @param bucketIdentifier the id of the bucket the flow exists in
+     * @param flowIdentifier the flow to be examined
+     * @param versionA the first version of the comparison
+     * @param versionB the second version of the comparison
+     * @return The differences between two specified versions, grouped by component.
+     */
+    public VersionedFlowDifference getFlowDiff(final String bucketIdentifier, final String flowIdentifier,
+                                               final Integer versionA, final Integer versionB) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket identifier cannot be null or blank");
+        }
+
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow identifier cannot be null or blank");
+        }
+
+        if (versionA == null || versionB == null) {
+            throw new IllegalArgumentException("Version cannot be null or blank");
+        }
+        // older version is always the lower, regardless of the order supplied
+        final Integer older = Math.min(versionA, versionB);
+        final Integer newer = Math.max(versionA, versionB);
+
+        readLock.lock();
+        try {
+            // Get the content for both versions of the flow
+            final byte[] serializedSnapshotA = flowPersistenceProvider.getFlowContent(bucketIdentifier, flowIdentifier, older);
+            if (serializedSnapshotA == null || serializedSnapshotA.length == 0) {
+                throw new IllegalStateException("No serialized content found for snapshot with flow identifier "
+                        + flowIdentifier + " and version " + older);
+            }
+
+            final byte[] serializedSnapshotB = flowPersistenceProvider.getFlowContent(bucketIdentifier, flowIdentifier, newer);
+            if (serializedSnapshotB == null || serializedSnapshotB.length == 0) {
+                throw new IllegalStateException("No serialized content found for snapshot with flow identifier "
+                        + flowIdentifier + " and version " + newer);
+            }
+
+            // deserialize the contents
+            final InputStream inputA = new ByteArrayInputStream(serializedSnapshotA);
+            final VersionedProcessGroup flowContentsA = processGroupSerializer.deserialize(inputA);
+            final InputStream inputB = new ByteArrayInputStream(serializedSnapshotB);
+            final VersionedProcessGroup flowContentsB = processGroupSerializer.deserialize(inputB);
+
+            final ComparableDataFlow comparableFlowA = new StandardComparableDataFlow(String.format("Version %d", older), flowContentsA);
+            final ComparableDataFlow comparableFlowB = new StandardComparableDataFlow(String.format("Version %d", newer), flowContentsB);
+
+            // Compare the two versions of the flow
+            final FlowComparator flowComparator = new StandardFlowComparator(comparableFlowA, comparableFlowB,
+                    null, new ConciseEvolvingDifferenceDescriptor());
+            final FlowComparison flowComparison = flowComparator.compare();
+
+            VersionedFlowDifference result = new VersionedFlowDifference();
+            result.setBucketId(bucketIdentifier);
+            result.setFlowId(flowIdentifier);
+            result.setVersionA(older);
+            result.setVersionB(newer);
+
+            Set<ComponentDifferenceGroup> differenceGroups = getStringComponentDifferenceGroupMap(flowComparison.getDifferences());
+            result.setComponentDifferenceGroups(differenceGroups);
+
+            return result;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    /**
+     * Group the differences in the comparison by component
+     * @param flowDifferences The differences to group together by component
+     * @return A set of componentDifferenceGroups where each entry contains a set of differences specific to that group
+     */
+    private Set<ComponentDifferenceGroup> getStringComponentDifferenceGroupMap(Set<FlowDifference> flowDifferences) {
+        Map<String, ComponentDifferenceGroup> differenceGroups = new HashMap<>();
+        for (FlowDifference diff : flowDifferences) {
+            ComponentDifferenceGroup group;
+            // A component may only exist on only one version for new/removed components
+            VersionedComponent component = ObjectUtils.firstNonNull(diff.getComponentA(), diff.getComponentB());
+            if(differenceGroups.containsKey(component.getIdentifier())){
+                group = differenceGroups.get(component.getIdentifier());
+            }else{
+                group = DataModelMapper.map(component);
+                differenceGroups.put(component.getIdentifier(), group);
+            }
+            group.getDifferences().add(DataModelMapper.map(diff));
+        }
+        return differenceGroups.values().stream().collect(Collectors.toSet());
+    }
+
+    // ---------------------- Field methods ---------------------------------------------
+
+    public Set<String> getBucketFields() {
+        return metadataService.getBucketFields();
+    }
+
+    public Set<String> getBucketItemFields() {
+        return metadataService.getBucketItemFields();
+    }
+
+    public Set<String> getFlowFields() {
+        return metadataService.getFlowFields();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
new file mode 100644
index 0000000..e456fa2
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.flow.FlowPersistenceProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.registry.provider.flow.FileSystemFlowPersistenceProvider
+org.apache.nifi.registry.provider.flow.git.GitFlowPersistenceProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.EventHookProvider
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.EventHookProvider b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.EventHookProvider
new file mode 100644
index 0000000..2676a35
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.hook.EventHookProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.registry.provider.hook.ScriptEventHookProvider
+org.apache.nifi.registry.provider.hook.LoggingEventHookProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authentication.IdentityProvider
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authentication.IdentityProvider b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authentication.IdentityProvider
new file mode 100644
index 0000000..530528f
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authentication.IdentityProvider
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.registry.security.ldap.LdapIdentityProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.AccessPolicyProvider
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.AccessPolicyProvider b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.AccessPolicyProvider
new file mode 100644
index 0000000..f57163f
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.AccessPolicyProvider
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.registry.security.authorization.file.FileAccessPolicyProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.Authorizer
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.Authorizer b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.Authorizer
new file mode 100644
index 0000000..b564fbb
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.Authorizer
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.registry.security.authorization.StandardManagedAuthorizer
+org.apache.nifi.registry.security.authorization.file.FileAuthorizer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.UserGroupProvider
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.UserGroupProvider b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.UserGroupProvider
new file mode 100644
index 0000000..ee28c07
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/META-INF/services/org.apache.nifi.registry.security.authorization.UserGroupProvider
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.registry.security.authorization.CompositeUserGroupProvider
+org.apache.nifi.registry.security.authorization.CompositeConfigurableUserGroupProvider
+org.apache.nifi.registry.security.authorization.file.FileUserGroupProvider
+org.apache.nifi.registry.security.ldap.tenants.LdapUserGroupProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/V2__Initial.sql
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/V2__Initial.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/V2__Initial.sql
new file mode 100644
index 0000000..b992d23
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/migration/V2__Initial.sql
@@ -0,0 +1,60 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- The NAME column has a max size of 768 because this is the largest size that MySQL allows when using a unique constraint.
+CREATE TABLE BUCKET (
+    ID VARCHAR(50) NOT NULL,
+    NAME VARCHAR(1000) NOT NULL,
+    DESCRIPTION TEXT,
+    CREATED TIMESTAMP NOT NULL,
+    CONSTRAINT PK__BUCKET_ID PRIMARY KEY (ID),
+    CONSTRAINT UNIQUE__BUCKET_NAME UNIQUE (NAME)
+);
+
+CREATE TABLE BUCKET_ITEM (
+    ID VARCHAR(50) NOT NULL,
+    NAME VARCHAR(1000) NOT NULL,
+    DESCRIPTION TEXT,
+    CREATED TIMESTAMP NOT NULL,
+    MODIFIED TIMESTAMP NOT NULL,
+    ITEM_TYPE VARCHAR(50) NOT NULL,
+    BUCKET_ID VARCHAR(50) NOT NULL,
+    CONSTRAINT PK__BUCKET_ITEM_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__BUCKET_ITEM_BUCKET_ID FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID)
+);
+
+CREATE TABLE FLOW (
+    ID VARCHAR(50) NOT NULL,
+    CONSTRAINT PK__FLOW_ID PRIMARY KEY (ID),
+    CONSTRAINT FK__FLOW_BUCKET_ITEM_ID FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID)
+);
+
+CREATE TABLE FLOW_SNAPSHOT (
+    FLOW_ID VARCHAR(50) NOT NULL,
+    VERSION INT NOT NULL,
+    CREATED TIMESTAMP NOT NULL,
+    CREATED_BY VARCHAR(4096) NOT NULL,
+    COMMENTS TEXT,
+    CONSTRAINT PK__FLOW_SNAPSHOT_FLOW_ID_AND_VERSION PRIMARY KEY (FLOW_ID, VERSION),
+    CONSTRAINT FK__FLOW_SNAPSHOT_FLOW_ID FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID)
+);
+
+CREATE TABLE SIGNING_KEY (
+    ID VARCHAR(50) NOT NULL,
+    TENANT_IDENTITY VARCHAR(4096) NOT NULL,
+    KEY_VALUE VARCHAR(50) NOT NULL,
+    CONSTRAINT PK__SIGNING_KEY_ID PRIMARY KEY (ID),
+    CONSTRAINT UNIQUE__SIGNING_KEY_TENANT_IDENTITY UNIQUE (TENANT_IDENTITY)
+);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1.2__IncreaseColumnSizes.sql
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1.2__IncreaseColumnSizes.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1.2__IncreaseColumnSizes.sql
new file mode 100644
index 0000000..b2e92d5
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1.2__IncreaseColumnSizes.sql
@@ -0,0 +1,25 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+ALTER TABLE BUCKET ALTER COLUMN NAME VARCHAR2(1000);
+ALTER TABLE BUCKET ALTER COLUMN DESCRIPTION VARCHAR2(65535);
+
+ALTER TABLE BUCKET_ITEM ALTER COLUMN NAME VARCHAR2(1000);
+ALTER TABLE BUCKET_ITEM ALTER COLUMN DESCRIPTION VARCHAR2(65535);
+
+ALTER TABLE FLOW_SNAPSHOT ALTER COLUMN CREATED_BY VARCHAR2(4096);
+ALTER TABLE FLOW_SNAPSHOT ALTER COLUMN COMMENTS VARCHAR2(65535);
+
+ALTER TABLE SIGNING_KEY ALTER COLUMN TENANT_IDENTITY VARCHAR2(4096);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1.3__DropBucketItemNameUniqueness.sql
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1.3__DropBucketItemNameUniqueness.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1.3__DropBucketItemNameUniqueness.sql
new file mode 100644
index 0000000..f29b4d0
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1.3__DropBucketItemNameUniqueness.sql
@@ -0,0 +1,27 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE ALIAS IF NOT EXISTS EXECUTE AS $$ void executeSql(Connection conn, String sql)
+throws SQLException { conn.createStatement().executeUpdate(sql); } $$;
+
+call execute('ALTER TABLE BUCKET_ITEM DROP CONSTRAINT ' ||
+    (
+     SELECT DISTINCT CONSTRAINT_NAME
+     FROM INFORMATION_SCHEMA.CONSTRAINTS
+     WHERE TABLE_NAME = 'BUCKET_ITEM'
+     AND COLUMN_LIST = 'NAME'
+     AND CONSTRAINT_TYPE = 'UNIQUE'
+     )
+);

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1__Initial.sql
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1__Initial.sql b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1__Initial.sql
new file mode 100644
index 0000000..a6b4960
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/resources/db/original/V1__Initial.sql
@@ -0,0 +1,54 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+CREATE TABLE BUCKET (
+    ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+    NAME VARCHAR2(200) NOT NULL UNIQUE,
+    DESCRIPTION VARCHAR(4096),
+    CREATED TIMESTAMP NOT NULL
+);
+
+CREATE TABLE BUCKET_ITEM (
+    ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+    NAME VARCHAR2(200) NOT NULL UNIQUE,
+    DESCRIPTION VARCHAR(4096),
+    CREATED TIMESTAMP NOT NULL,
+    MODIFIED TIMESTAMP NOT NULL,
+    ITEM_TYPE VARCHAR(50) NOT NULL,
+    BUCKET_ID VARCHAR2(50) NOT NULL,
+    FOREIGN KEY (BUCKET_ID) REFERENCES BUCKET(ID)
+);
+
+CREATE TABLE FLOW (
+    ID VARCHAR2(50) NOT NULL PRIMARY KEY,
+    FOREIGN KEY (ID) REFERENCES BUCKET_ITEM(ID)
+);
+
+CREATE TABLE FLOW_SNAPSHOT (
+    FLOW_ID VARCHAR2(50) NOT NULL,
+    VERSION INT NOT NULL,
+    CREATED TIMESTAMP NOT NULL,
+    CREATED_BY VARCHAR2(200) NOT NULL,
+    COMMENTS VARCHAR(4096),
+    PRIMARY KEY (FLOW_ID, VERSION),
+    FOREIGN KEY (FLOW_ID) REFERENCES FLOW(ID)
+);
+
+CREATE TABLE SIGNING_KEY (
+    ID VARCHAR2(50) NOT NULL,
+    TENANT_IDENTITY VARCHAR2(50) NOT NULL UNIQUE,
+    KEY_VALUE VARCHAR2(50) NOT NULL,
+    PRIMARY KEY (ID)
+);
\ No newline at end of file