You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/09/01 15:07:25 UTC

[1/3] nifi-registry git commit: NIFIREG-10 Implementing Registry service layer and connecting to REST end-points - Adding FlowSnapshotSerializer and JAXBFlowSnapshotSerializer - Adding bean validation 2.0.0 using Hibernate validator as the implementation

Repository: nifi-registry
Updated Branches:
  refs/heads/master 88cae4f15 -> a1629c86d


http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java
index 5ad934d..f210a3d 100644
--- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java
+++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/FlowResource.java
@@ -20,8 +20,11 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.registry.flow.VersionedFlow;
 import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.service.RegistryService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,6 +38,8 @@ import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.util.Set;
+import java.util.SortedSet;
 
 @Path("/flows")
 @Api(
@@ -45,6 +50,12 @@ public class FlowResource {
 
     private static final Logger logger = LoggerFactory.getLogger(FlowResource.class);
 
+    private final RegistryService registryService;
+
+    public FlowResource(final RegistryService registryService) {
+        this.registryService = registryService;
+    }
+
     @GET
     @Consumes(MediaType.WILDCARD)
     @Produces(MediaType.APPLICATION_JSON)
@@ -54,9 +65,8 @@ public class FlowResource {
             responseContainer = "List"
     )
     public Response getFlows() {
-        // TODO implement getFlows
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+        final Set<VersionedFlow> flows = registryService.getFlows();
+        return Response.status(Response.Status.OK).entity(flows).build();
     }
 
     @GET
@@ -72,11 +82,9 @@ public class FlowResource {
                     @ApiResponse(code = 404, message = "The specified resource could not be found."),
             }
     )
-    public Response getFlow(
-            @PathParam("flowId") String flowId) {
-        // TODO implement getFlow
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response getFlow(@PathParam("flowId") final String flowId) {
+        final VersionedFlow flow = registryService.getFlow(flowId);
+        return Response.status(Response.Status.OK).entity(flow).build();
     }
 
     @PUT
@@ -92,11 +100,25 @@ public class FlowResource {
                     @ApiResponse(code = 404, message = "The specified resource could not be found."),
             }
     )
-    public Response updateFlow(
-            @PathParam("flowId") String flowId) {
-        // TODO implement updateFlow
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response updateFlow(@PathParam("flowId") final String flowId, final VersionedFlow flow) {
+        if (StringUtils.isBlank(flowId)) {
+            throw new IllegalArgumentException("Flow Id cannot be blank");
+        }
+
+        if (flow == null) {
+            throw new IllegalArgumentException("Flow cannot be null");
+        }
+
+        if (flow.getIdentifier() != null && !flowId.equals(flow.getIdentifier())) {
+            throw new IllegalArgumentException("Flow id in path param must match flow id in body");
+        }
+
+        if (flow.getIdentifier() == null) {
+            flow.setIdentifier(flowId);
+        }
+
+        final VersionedFlow updatedFlow = registryService.updateFlow(flow);
+        return Response.status(Response.Status.OK).entity(updatedFlow).build();
     }
 
     @DELETE
@@ -112,11 +134,9 @@ public class FlowResource {
                     @ApiResponse(code = 404, message = "The specified resource could not be found."),
             }
     )
-    public Response deleteFlow(
-            @PathParam("flowId") String flowId) {
-        // TODO implement deleteFlow
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response deleteFlow(@PathParam("flowId") final String flowId) {
+        final VersionedFlow deletedFlow = registryService.deleteFlow(flowId);
+        return Response.status(Response.Status.OK).entity(deletedFlow).build();
     }
 
     @POST
@@ -128,11 +148,26 @@ public class FlowResource {
             "The version number is created by the server and a location URI for the created version resource is returned.",
             response = VersionedFlowSnapshot.class
     )
-    public Response createFlowVersion(
-            @PathParam("flowId") String flowId) {
-        // TODO implement createFlowVersion
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response createFlowVersion(@PathParam("flowId") final String flowId, final VersionedFlowSnapshot snapshot) {
+        if (StringUtils.isBlank(flowId)) {
+            throw new IllegalArgumentException("Flow Id cannot be blank");
+        }
+
+        if (snapshot == null) {
+            throw new IllegalArgumentException("VersionedFlowSnapshot cannot be null");
+        }
+
+        if (snapshot.getSnapshotMetadata() != null && snapshot.getSnapshotMetadata().getFlowIdentifier() != null
+                && !flowId.equals(snapshot.getSnapshotMetadata().getFlowIdentifier())) {
+            throw new IllegalArgumentException("Flow id in path param must match flow id in body");
+        }
+
+        if (snapshot.getSnapshotMetadata() != null && snapshot.getSnapshotMetadata().getFlowIdentifier() != null) {
+            snapshot.getSnapshotMetadata().setFlowIdentifier(flowId);
+        }
+
+        final VersionedFlowSnapshot createdSnapshot = registryService.createFlowSnapshot(snapshot);
+        return Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
     @GET
@@ -141,9 +176,7 @@ public class FlowResource {
     @Produces(MediaType.APPLICATION_JSON)
     @ApiOperation(
             value = "Get summary of all versions of a flow for a given flow ID.",
-            response = VersionedFlowSnapshot.class, /* TODO, add a JSON serialization view for VersionedFlowSnapshot
-                                                       for this endpoint that  hides the flowContents property when
-                                                       this object is returned as part of a collection. */
+            response = VersionedFlowSnapshotMetadata.class,
             responseContainer = "List"
     )
     @ApiResponses(
@@ -151,11 +184,9 @@ public class FlowResource {
                     @ApiResponse(code = 404, message = "The specified resource could not be found."),
             }
     )
-    public Response getFlowVersions(
-            @PathParam("flowId") String flowId) {
-        // TODO implement getFlowVersions
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response getFlowVersions(@PathParam("flowId") final String flowId) {
+        final VersionedFlow flow = registryService.getFlow(flowId);
+        return Response.status(Response.Status.OK).entity(flow.getSnapshotMetadata()).build();
     }
 
     @GET
@@ -171,11 +202,20 @@ public class FlowResource {
                     @ApiResponse(code = 404, message = "The specified resource could not be found."),
             }
     )
-    public Response getLatestFlowVersion(
-            @PathParam("flowId") String flowId) {
-        // TODO implement getLatestFlowVersion
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response getLatestFlowVersion(@PathParam("flowId") final String flowId) {
+        final VersionedFlow flow = registryService.getFlow(flowId);
+
+        final SortedSet<VersionedFlowSnapshotMetadata> snapshots = flow.getSnapshotMetadata();
+        if (snapshots == null || snapshots.size() == 0) {
+            return Response.status(Response.Status.NOT_FOUND).build();
+        }
+
+        final VersionedFlowSnapshotMetadata lastSnapshotMetadata = snapshots.last();
+
+        final VersionedFlowSnapshot lastSnapshot = registryService.getFlowSnapshot(
+                lastSnapshotMetadata.getFlowIdentifier(), lastSnapshotMetadata.getVersion());
+
+        return Response.status(Response.Status.OK).entity(lastSnapshot).build();
     }
 
     @GET
@@ -192,11 +232,10 @@ public class FlowResource {
             }
     )
     public Response getFlowVersion(
-            @PathParam("flowId") String flowId,
-            @PathParam("versionNumber") Integer versionNumber) {
-        // TODO implement getFlowVersion
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+            @PathParam("flowId") final String flowId,
+            @PathParam("versionNumber") final Integer versionNumber) {
+        final VersionedFlowSnapshot snapshot = registryService.getFlowSnapshot(flowId, versionNumber);
+        return Response.status(Response.Status.OK).entity(snapshot).build();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/IllegalStateExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/IllegalStateExceptionMapper.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/IllegalStateExceptionMapper.java
new file mode 100644
index 0000000..0cc98fc
--- /dev/null
+++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/IllegalStateExceptionMapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.web.mapper;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class IllegalStateExceptionMapper implements ExceptionMapper<IllegalStateException> {
+
+    private static final Logger logger = LoggerFactory.getLogger(IllegalStateExceptionMapper.class);
+
+    @Override
+    public Response toResponse(IllegalStateException exception) {
+        // log the error
+        logger.info(String.format("%s. Returning %s response.", exception, Response.Status.CONFLICT));
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(StringUtils.EMPTY, exception);
+        }
+
+        return Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/ResourceNotFoundExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/ResourceNotFoundExceptionMapper.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/ResourceNotFoundExceptionMapper.java
new file mode 100644
index 0000000..40d3f36
--- /dev/null
+++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/ResourceNotFoundExceptionMapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.web.mapper;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.exception.ResourceNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class ResourceNotFoundExceptionMapper implements ExceptionMapper<ResourceNotFoundException> {
+
+    private static final Logger logger = LoggerFactory.getLogger(ResourceNotFoundExceptionMapper.class);
+
+    @Override
+    public Response toResponse(ResourceNotFoundException exception) {
+        // log the error
+        logger.info(String.format("%s. Returning %s response.", exception, Response.Status.NOT_FOUND));
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(StringUtils.EMPTY, exception);
+        }
+
+        return Response.status(Response.Status.NOT_FOUND).entity(exception.getMessage()).type("text/plain").build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/SerializationExceptionMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/SerializationExceptionMapper.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/SerializationExceptionMapper.java
new file mode 100644
index 0000000..e24963d
--- /dev/null
+++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/mapper/SerializationExceptionMapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.web.mapper;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.serialization.SerializationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+@Provider
+public class SerializationExceptionMapper implements ExceptionMapper<SerializationException> {
+
+    private static final Logger logger = LoggerFactory.getLogger(SerializationExceptionMapper.class);
+
+    @Override
+    public Response toResponse(SerializationException exception) {
+        // log the error
+        logger.info(String.format("%s. Returning %s response.", exception, Response.Status.INTERNAL_SERVER_ERROR));
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(StringUtils.EMPTY, exception);
+        }
+
+        return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(exception.getMessage()).type("text/plain").build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/TestRestAPI.java
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/TestRestAPI.java b/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/TestRestAPI.java
new file mode 100644
index 0000000..fc980f9
--- /dev/null
+++ b/nifi-registry-web-api/src/test/java/org/apache/nifi/registry/web/TestRestAPI.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.web;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+public class TestRestAPI {
+
+    public static final Logger LOGGER = LoggerFactory.getLogger(TestRestAPI.class);
+
+    public static final String REGISTRY_API_URL = "http://localhost:8080/nifi-registry-api";
+    public static final String REGISTRY_API_BUCKETS_URL = REGISTRY_API_URL + "/buckets";
+    public static final String REGISTRY_API_FLOWS_URL = REGISTRY_API_URL + "/flows";
+
+    public static void main(String[] args) {
+        try {
+            final Client client = ClientBuilder.newClient();
+
+            // Create a bucket
+
+            final Bucket bucket = new Bucket();
+            bucket.setName("First Bucket");
+            bucket.setDescription("This is the first bucket.");
+
+            final Bucket createdBucket = client.target(REGISTRY_API_BUCKETS_URL)
+                    .request()
+                    .post(
+                            Entity.entity(bucket, MediaType.APPLICATION_JSON),
+                            Bucket.class
+                    );
+
+            System.out.println("Created bucket with id: " + createdBucket.getName());
+
+            // Create a flow
+
+            final VersionedFlow versionedFlow = new VersionedFlow();
+            versionedFlow.setName("First Flow");
+            versionedFlow.setDescription("This is the first flow.");
+
+            final VersionedFlow createdFlow = client.target(REGISTRY_API_BUCKETS_URL)
+                    .path("/{bucketId}/flows")
+                    .resolveTemplate("bucketId", createdBucket.getIdentifier())
+                    .request()
+                    .post(
+                            Entity.entity(versionedFlow, MediaType.APPLICATION_JSON),
+                            VersionedFlow.class
+                    );
+
+            System.out.println("Created flow with id: " + createdFlow.getIdentifier());
+
+            // Create first snapshot for the flow
+
+            final VersionedFlowSnapshotMetadata snapshotMetadata1 = new VersionedFlowSnapshotMetadata();
+            snapshotMetadata1.setBucketIdentifier(createdBucket.getIdentifier());
+            snapshotMetadata1.setFlowIdentifier(createdFlow.getIdentifier());
+            snapshotMetadata1.setFlowName(createdFlow.getName());
+            snapshotMetadata1.setVersion(1);
+            snapshotMetadata1.setComments("This is snapshot #1.");
+
+            final VersionedProcessGroup snapshotContents1 = new VersionedProcessGroup();
+            snapshotContents1.setIdentifier("pg1");
+            snapshotContents1.setName("Process Group 1");
+
+            final VersionedFlowSnapshot snapshot1 = new VersionedFlowSnapshot();
+            snapshot1.setSnapshotMetadata(snapshotMetadata1);
+            snapshot1.setFlowContents(snapshotContents1);
+
+            final VersionedFlowSnapshot createdSnapshot1 = client.target(REGISTRY_API_FLOWS_URL)
+                    .path("/{flowId}/versions")
+                    .resolveTemplate("flowId", createdFlow.getIdentifier())
+                    .request()
+                    .post(
+                            Entity.entity(snapshot1, MediaType.APPLICATION_JSON_TYPE),
+                            VersionedFlowSnapshot.class
+                    );
+
+            System.out.println("Created snapshot with version: " + createdSnapshot1.getSnapshotMetadata().getVersion());
+
+            // Create second snapshot for the flow
+
+            final VersionedFlowSnapshotMetadata snapshotMetadata2 = new VersionedFlowSnapshotMetadata();
+            snapshotMetadata2.setBucketIdentifier(createdBucket.getIdentifier());
+            snapshotMetadata2.setFlowIdentifier(createdFlow.getIdentifier());
+            snapshotMetadata2.setFlowName(createdFlow.getName());
+            snapshotMetadata2.setVersion(2);
+            snapshotMetadata2.setComments("This is snapshot #2.");
+
+            final VersionedProcessGroup snapshotContents2 = new VersionedProcessGroup();
+            snapshotContents2.setIdentifier("pg1");
+            snapshotContents2.setName("Process Group 1 New Name");
+
+            final VersionedFlowSnapshot snapshot2 = new VersionedFlowSnapshot();
+            snapshot2.setSnapshotMetadata(snapshotMetadata2);
+            snapshot2.setFlowContents(snapshotContents2);
+
+            final VersionedFlowSnapshot createdSnapshot2 = client.target(REGISTRY_API_FLOWS_URL)
+                    .path("/{flowId}/versions")
+                    .resolveTemplate("flowId", createdFlow.getIdentifier())
+                    .request()
+                    .post(
+                            Entity.entity(snapshot2, MediaType.APPLICATION_JSON_TYPE),
+                            VersionedFlowSnapshot.class
+                    );
+
+            System.out.println("Created snapshot with version: " + createdSnapshot2.getSnapshotMetadata().getVersion());
+
+            // Retrieve the flow by id
+
+            final Response flowResponse = client.target(REGISTRY_API_FLOWS_URL)
+                    .path("/{flowId}")
+                    .resolveTemplate("flowId", createdFlow.getIdentifier())
+                    .request()
+                    .get();
+
+            final String flowJson = flowResponse.readEntity(String.class);
+            System.out.println("Flow: " + flowJson);
+        } catch (WebApplicationException e) {
+            LOGGER.error(e.getMessage(), e);
+
+            final Response response = e.getResponse();
+            LOGGER.error(response.readEntity(String.class));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6538337..47cf327 100644
--- a/pom.xml
+++ b/pom.xml
@@ -244,6 +244,32 @@
                 <version>${jersey.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.glassfish.jersey.ext</groupId>
+                <artifactId>jersey-bean-validation</artifactId>
+                <version>${jersey.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.glassfish.web</groupId>
+                        <artifactId>javax.el</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
+                <groupId>javax.validation</groupId>
+                <artifactId>validation-api</artifactId>
+                <version>2.0.0.Final</version>
+            </dependency>
+            <dependency>
+                <groupId>org.hibernate</groupId>
+                <artifactId>hibernate-validator</artifactId>
+                <version>6.0.2.Final</version>
+            </dependency>
+            <dependency>
+                <groupId>org.glassfish</groupId>
+                <artifactId>javax.el</artifactId>
+                <version>3.0.1-b08</version>
+            </dependency>
+            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-lang3</artifactId>
                 <version>3.5</version>
@@ -253,6 +279,17 @@
                 <artifactId>commons-io</artifactId>
                 <version>2.5</version>
             </dependency>
+            <dependency>
+                <groupId>io.swagger</groupId>
+                <artifactId>swagger-annotations</artifactId>
+                <version>1.5.16</version>
+            </dependency>
+            <dependency>
+                <groupId>org.mockito</groupId>
+                <artifactId>mockito-core</artifactId>
+                <version>2.7.22</version>
+                <scope>test</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 


[2/3] nifi-registry git commit: NIFIREG-10 Implementing Registry service layer and connecting to REST end-points - Adding FlowSnapshotSerializer and JAXBFlowSnapshotSerializer - Adding bean validation 2.0.0 using Hibernate validator as the implementation

Posted by bb...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
new file mode 100644
index 0000000..8762706
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestRegistryService.java
@@ -0,0 +1,936 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.exception.ResourceNotFoundException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+import org.apache.nifi.registry.metadata.StandardBucketMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata;
+import org.apache.nifi.registry.serialization.FlowSnapshotSerializer;
+import org.apache.nifi.registry.serialization.Serializer;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import javax.validation.ConstraintViolationException;
+import javax.validation.Validation;
+import javax.validation.Validator;
+import javax.validation.ValidatorFactory;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestRegistryService {
+
+    private MetadataProvider metadataProvider;
+    private FlowPersistenceProvider flowPersistenceProvider;
+    private Serializer<VersionedFlowSnapshot> snapshotSerializer;
+    private Validator validator;
+
+    private RegistryService registryService;
+
+    @Before
+    public void setup() {
+        metadataProvider = mock(MetadataProvider.class);
+        flowPersistenceProvider = mock(FlowPersistenceProvider.class);
+        snapshotSerializer = mock(FlowSnapshotSerializer.class);
+
+        final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
+        validator = validatorFactory.getValidator();
+
+        registryService = new RegistryService(metadataProvider, flowPersistenceProvider, snapshotSerializer, validator);
+    }
+
+    // ---------------------- Test Bucket methods ---------------------------------------------
+
+    @Test
+    public void testCreateBucketValid() {
+        final Bucket bucket = new Bucket();
+        bucket.setName("My Bucket");
+        bucket.setDescription("This is my bucket.");
+
+        when(metadataProvider.getBucketByName(bucket.getName())).thenReturn(null);
+
+        doAnswer(createBucketAnswer()).when(metadataProvider).createBucket(any(BucketMetadata.class));
+
+        final Bucket createdBucket = registryService.createBucket(bucket);
+        assertNotNull(createdBucket);
+        assertNotNull(createdBucket.getIdentifier());
+        assertNotNull(createdBucket.getCreatedTimestamp());
+
+        assertEquals(bucket.getName(), createdBucket.getName());
+        assertEquals(bucket.getDescription(), createdBucket.getDescription());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCreateBucketWithSameName() {
+        final Bucket bucket = new Bucket();
+        bucket.setName("My Bucket");
+        bucket.setDescription("This is my bucket.");
+
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketByName(bucket.getName())).thenReturn(existingBucket);
+
+        // should throw exception since a bucket with the same name exists
+        registryService.createBucket(bucket);
+    }
+
+    @Test(expected = ConstraintViolationException.class)
+    public void testCreateBucketWithMissingName() {
+        final Bucket bucket = new Bucket();
+        when(metadataProvider.getBucketByName(bucket.getName())).thenReturn(null);
+        registryService.createBucket(bucket);
+    }
+
+    @Test
+    public void testGetExistingBucket() {
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        final Bucket bucket = registryService.getBucket(existingBucket.getIdentifier());
+        assertNotNull(bucket);
+        assertEquals(existingBucket.getIdentifier(), bucket.getIdentifier());
+        assertEquals(existingBucket.getName(), bucket.getName());
+        assertEquals(existingBucket.getDescription(), bucket.getDescription());
+        assertEquals(existingBucket.getCreatedTimestamp(), bucket.getCreatedTimestamp());
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testGetBucketDoesNotExist() {
+        when(metadataProvider.getBucketById(any(String.class))).thenReturn(null);
+        registryService.getBucket("does-not-exist");
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testUpdateBucketWithoutId() {
+        final Bucket bucket = new Bucket();
+        bucket.setName("My Bucket");
+        bucket.setDescription("This is my bucket.");
+        registryService.updateBucket(bucket);
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testUpdateBucketDoesNotExist() {
+        final Bucket bucket = new Bucket();
+        bucket.setIdentifier("b1");
+        bucket.setName("My Bucket");
+        bucket.setDescription("This is my bucket.");
+        registryService.updateBucket(bucket);
+
+        when(metadataProvider.getBucketById(any(String.class))).thenReturn(null);
+        registryService.updateBucket(bucket);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testUpdateBucketWithSameNameAsExistingBucket() {
+        final BucketMetadata bucketToUpdate = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(bucketToUpdate.getIdentifier())).thenReturn(bucketToUpdate);
+
+        final BucketMetadata otherBucket = new StandardBucketMetadata.Builder()
+                .identifier("b2")
+                .name("My Bucket #2")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketByName(otherBucket.getName())).thenReturn(otherBucket);
+
+        // should fail because other bucket has the same name
+        final Bucket updatedBucket = new Bucket();
+        updatedBucket.setIdentifier(bucketToUpdate.getIdentifier());
+        updatedBucket.setName("My Bucket #2");
+        updatedBucket.setDescription(bucketToUpdate.getDescription());
+
+        registryService.updateBucket(updatedBucket);
+    }
+
+    @Test
+    public void testUpdateBucket() {
+        final BucketMetadata bucketToUpdate = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(bucketToUpdate.getIdentifier())).thenReturn(bucketToUpdate);
+
+        doAnswer(updateBucketAnswer()).when(metadataProvider).updateBucket(any(BucketMetadata.class));
+
+        final Bucket updatedBucket = new Bucket();
+        updatedBucket.setIdentifier(bucketToUpdate.getIdentifier());
+        updatedBucket.setName("Updated Name");
+        updatedBucket.setDescription("Updated Description");
+
+        final Bucket result = registryService.updateBucket(updatedBucket);
+        assertNotNull(result);
+        assertEquals(updatedBucket.getName(), result.getName());
+        assertEquals(updatedBucket.getDescription(), result.getDescription());
+    }
+
+    @Test
+    public void testUpdateBucketPartial() {
+        final BucketMetadata bucketToUpdate = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(bucketToUpdate.getIdentifier())).thenReturn(bucketToUpdate);
+
+        doAnswer(updateBucketAnswer()).when(metadataProvider).updateBucket(any(BucketMetadata.class));
+
+        final Bucket updatedBucket = new Bucket();
+        updatedBucket.setIdentifier(bucketToUpdate.getIdentifier());
+        updatedBucket.setName("Updated Name");
+        updatedBucket.setDescription(null);
+
+        // name should be updated but description should not be changed
+        final Bucket result = registryService.updateBucket(updatedBucket);
+        assertNotNull(result);
+        assertEquals(updatedBucket.getName(), result.getName());
+        assertEquals(bucketToUpdate.getDescription(), result.getDescription());
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testDeleteBucketDoesNotExist() {
+        final String bucketId = "b1";
+        when(metadataProvider.getBucketById(bucketId)).thenReturn(null);
+        registryService.deleteBucket(bucketId);
+    }
+
+    @Test
+    public void testDeleteBucketWithFlows() {
+        final BucketMetadata bucketToDelete = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(bucketToDelete.getIdentifier())).thenReturn(bucketToDelete);
+
+        final FlowMetadata flowToDelete = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("Flow 1")
+                .description("This is flow 1")
+                .created(System.currentTimeMillis())
+                .build();
+
+        final Set<FlowMetadata> flowsToDelete = new HashSet<>();
+        flowsToDelete.add(flowToDelete);
+
+        when(metadataProvider.getFlows(bucketToDelete.getIdentifier())).thenReturn(flowsToDelete);
+
+        final Bucket deletedBucket = registryService.deleteBucket(bucketToDelete.getIdentifier());
+        assertNotNull(deletedBucket);
+        assertEquals(bucketToDelete.getIdentifier(), deletedBucket.getIdentifier());
+
+        verify(flowPersistenceProvider, times(1))
+                .deleteSnapshots(eq(bucketToDelete.getIdentifier()), eq(flowToDelete.getIdentifier()));
+    }
+
+    // ---------------------- Test VersionedFlow methods ---------------------------------------------
+
+    @Test(expected = ConstraintViolationException.class)
+    public void testCreateFlowInvalid() {
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        registryService.createFlow("b1", versionedFlow);
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testCreateFlowBucketDoesNotExist() {
+
+        when(metadataProvider.getBucketById(any(String.class))).thenReturn(null);
+
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setName("My Flow");
+        versionedFlow.setBucketIdentifier("b1");
+
+        registryService.createFlow(versionedFlow.getBucketIdentifier(), versionedFlow);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCreateFlowWithSameName() {
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        // setup a flow with the same name that already exists
+
+        final FlowMetadata flowMetadata = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("My Flow")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowByName(flowMetadata.getName())).thenReturn(flowMetadata);
+
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setName(flowMetadata.getName());
+        versionedFlow.setBucketIdentifier("b1");
+
+        registryService.createFlow(versionedFlow.getBucketIdentifier(), versionedFlow);
+    }
+
+    @Test
+    public void testCreateFlowValid() {
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setName("My Flow");
+        versionedFlow.setBucketIdentifier("b1");
+
+        doAnswer(createFlowAnswer()).when(metadataProvider).createFlow(any(String.class), any(FlowMetadata.class));
+
+        final VersionedFlow createdFlow = registryService.createFlow(versionedFlow.getBucketIdentifier(), versionedFlow);
+        assertNotNull(createdFlow);
+        assertNotNull(createdFlow.getIdentifier());
+        assertTrue(createdFlow.getCreatedTimestamp() > 0);
+        assertTrue(createdFlow.getModifiedTimestamp() > 0);
+        assertEquals(versionedFlow.getName(), createdFlow.getName());
+        assertEquals(versionedFlow.getBucketIdentifier(), createdFlow.getBucketIdentifier());
+        assertEquals(versionedFlow.getDescription(), createdFlow.getDescription());
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testGetFlowDoesNotExist() {
+        when(metadataProvider.getFlowById(any(String.class))).thenReturn(null);
+        registryService.getFlow("flow1");
+    }
+
+    @Test
+    public void testGetFlowExists() {
+        final FlowMetadata flowMetadata = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("My Flow")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowById(flowMetadata.getIdentifier())).thenReturn(flowMetadata);
+
+        final VersionedFlow versionedFlow = registryService.getFlow(flowMetadata.getIdentifier());
+        assertNotNull(versionedFlow);
+        assertEquals(flowMetadata.getIdentifier(), versionedFlow.getIdentifier());
+        assertEquals(flowMetadata.getName(), versionedFlow.getName());
+        assertEquals(flowMetadata.getDescription(), versionedFlow.getDescription());
+        assertEquals(flowMetadata.getBucketIdentifier(), versionedFlow.getBucketIdentifier());
+        assertEquals(flowMetadata.getCreatedTimestamp(), versionedFlow.getCreatedTimestamp());
+        assertEquals(flowMetadata.getModifiedTimestamp(), versionedFlow.getModifiedTimestamp());
+    }
+
+    @Test
+    public void testGetFlows() {
+        final FlowMetadata flowMetadata1 = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("My Flow")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        final FlowMetadata flowMetadata2 = new StandardFlowMetadata.Builder()
+                .identifier("flow2")
+                .name("My Flow")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        final Set<FlowMetadata> flows = new LinkedHashSet<>();
+        flows.add(flowMetadata1);
+        flows.add(flowMetadata2);
+
+        when(metadataProvider.getFlows()).thenReturn(flows);
+
+        final Set<VersionedFlow> allFlows = registryService.getFlows();
+        assertNotNull(allFlows);
+        assertEquals(2, allFlows.size());
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testGetFlowsByBucketDoesNotExist() {
+        when(metadataProvider.getBucketById(any(String.class))).thenReturn(null);
+        registryService.getFlows("b1");
+    }
+
+    @Test
+    public void testGetFlowsByBucketExists() {
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier("b1")
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        final FlowMetadata flowMetadata1 = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("My Flow")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        final FlowMetadata flowMetadata2 = new StandardFlowMetadata.Builder()
+                .identifier("flow2")
+                .name("My Flow")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        final Set<FlowMetadata> flows = new LinkedHashSet<>();
+        flows.add(flowMetadata1);
+        flows.add(flowMetadata2);
+
+        when(metadataProvider.getFlows(existingBucket.getIdentifier())).thenReturn(flows);
+
+        final Set<VersionedFlow> allFlows = registryService.getFlows(existingBucket.getIdentifier());
+        assertNotNull(allFlows);
+        assertEquals(2, allFlows.size());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testUpdateFlowWithoutId() {
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        registryService.updateFlow(versionedFlow);
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testUpdateFlowDoesNotExist() {
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setIdentifier("flow1");
+
+        when(metadataProvider.getFlowById(versionedFlow.getIdentifier())).thenReturn(null);
+
+        registryService.updateFlow(versionedFlow);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testUpdateFlowWithSameNameAsExistingFlow() {
+        final FlowMetadata flowToUpdate = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("My Flow 1")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowById(flowToUpdate.getIdentifier())).thenReturn(flowToUpdate);
+
+        final FlowMetadata otherFlow = new StandardFlowMetadata.Builder()
+                .identifier("flow2")
+                .name("My Flow 2")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowByName(otherFlow.getName())).thenReturn(otherFlow);
+
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setIdentifier(flowToUpdate.getIdentifier());
+        versionedFlow.setName(otherFlow.getName());
+
+        registryService.updateFlow(versionedFlow);
+    }
+
+    @Test
+    public void testUpdateFlow() throws InterruptedException {
+        final FlowMetadata flowToUpdate = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("My Flow 1")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowById(flowToUpdate.getIdentifier())).thenReturn(flowToUpdate);
+        when(metadataProvider.getFlowByName(flowToUpdate.getName())).thenReturn(flowToUpdate);
+
+        doAnswer(updateFlowAnswer()).when(metadataProvider).updateFlow(any(FlowMetadata.class));
+
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setIdentifier(flowToUpdate.getIdentifier());
+        versionedFlow.setName("New Flow Name");
+        versionedFlow.setDescription("This is a new description");
+
+        Thread.sleep(10);
+
+        final VersionedFlow updatedFlow = registryService.updateFlow(versionedFlow);
+        assertNotNull(updatedFlow);
+        assertEquals(versionedFlow.getIdentifier(), updatedFlow.getIdentifier());
+
+        // name and description should be updated
+        assertEquals(versionedFlow.getName(), updatedFlow.getName());
+        assertEquals(versionedFlow.getDescription(), updatedFlow.getDescription());
+
+        // other fields should not be updated
+        assertEquals(flowToUpdate.getBucketIdentifier(), updatedFlow.getBucketIdentifier());
+        assertEquals(flowToUpdate.getCreatedTimestamp(), updatedFlow.getCreatedTimestamp());
+
+        // modified timestamp should be auto updated
+        assertTrue(updatedFlow.getModifiedTimestamp() > flowToUpdate.getModifiedTimestamp());
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testDeleteFlowDoesNotExist() {
+        when(metadataProvider.getFlowById(any(String.class))).thenReturn(null);
+        registryService.deleteFlow("flow1");
+    }
+
+    @Test
+    public void testDeleteFlowWithSnapshots() {
+        final FlowMetadata flowToDelete = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("My Flow 1")
+                .description("This is my flow.")
+                .bucketIdentifier("b1")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowById(flowToDelete.getIdentifier())).thenReturn(flowToDelete);
+        when(metadataProvider.getFlowByName(flowToDelete.getName())).thenReturn(flowToDelete);
+
+        final VersionedFlow deletedFlow = registryService.deleteFlow(flowToDelete.getIdentifier());
+        assertNotNull(deletedFlow);
+        assertEquals(flowToDelete.getIdentifier(), deletedFlow.getIdentifier());
+
+        verify(flowPersistenceProvider, times(1))
+                .deleteSnapshots(flowToDelete.getBucketIdentifier(), flowToDelete.getIdentifier());
+
+        verify(metadataProvider, times(1))
+                .deleteFlow(flowToDelete.getIdentifier());
+    }
+
+    // ---------------------- Test VersionedFlowSnapshot methods ---------------------------------------------
+
+    private VersionedFlowSnapshot createSnapshot() {
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setFlowIdentifier("flow1");
+        snapshotMetadata.setFlowName("First Flow");
+        snapshotMetadata.setVersion(1);
+        snapshotMetadata.setComments("This is the first snapshot");
+        snapshotMetadata.setBucketIdentifier("b1");
+
+        final VersionedProcessGroup processGroup = new VersionedProcessGroup();
+        processGroup.setIdentifier("pg1");
+        processGroup.setName("My Process Group");
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+        snapshot.setFlowContents(processGroup);
+
+        return snapshot;
+    }
+
+    @Test(expected = ConstraintViolationException.class)
+    public void testCreateSnapshotInvalidMetadata() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+        snapshot.getSnapshotMetadata().setFlowName(null);
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test(expected = ConstraintViolationException.class)
+    public void testCreateSnapshotInvalidFlowContents() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+        snapshot.setFlowContents(null);
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test(expected = ConstraintViolationException.class)
+    public void testCreateSnapshotNullMetadata() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+        snapshot.setSnapshotMetadata(null);
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test(expected = ConstraintViolationException.class)
+    public void testCreateSnapshotNullFlowContents() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+        snapshot.setFlowContents(null);
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testCreateSnapshotBucketDoesNotExist() {
+        when(metadataProvider.getBucketById(any(String.class))).thenReturn(null);
+
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testCreateSnapshotFlowDoesNotExist() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        when(metadataProvider.getFlowById(snapshot.getSnapshotMetadata().getFlowIdentifier())).thenReturn(null);
+
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCreateSnapshotVersionAlreadyExists() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        // make a snapshot that has the same version as the one being created
+        final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder()
+                .flowIdentifier(snapshot.getSnapshotMetadata().getFlowIdentifier())
+                .flowName(snapshot.getSnapshotMetadata().getFlowName())
+                .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .version(snapshot.getSnapshotMetadata().getVersion())
+                .comments("This is an existing snapshot")
+                .created(System.currentTimeMillis())
+                .build();
+
+        // return a flow with the existing snapshot when getFlowById is called
+        final FlowMetadata existingFlow = new StandardFlowMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getFlowIdentifier())
+                .name("My Flow 1")
+                .description("This is my flow.")
+                .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .addSnapshot(existingSnapshot)
+                .build();
+
+        when(metadataProvider.getFlowById(existingFlow.getIdentifier())).thenReturn(existingFlow);
+
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCreateSnapshotVersionNotNextVersion() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        // make a snapshot for version 1
+        final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder()
+                .flowIdentifier(snapshot.getSnapshotMetadata().getFlowIdentifier())
+                .flowName(snapshot.getSnapshotMetadata().getFlowName())
+                .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .version(1)
+                .comments("This is an existing snapshot")
+                .created(System.currentTimeMillis())
+                .build();
+
+        // return a flow with the existing snapshot when getFlowById is called
+        final FlowMetadata existingFlow = new StandardFlowMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getFlowIdentifier())
+                .name("My Flow 1")
+                .description("This is my flow.")
+                .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .addSnapshot(existingSnapshot)
+                .build();
+
+        when(metadataProvider.getFlowById(existingFlow.getIdentifier())).thenReturn(existingFlow);
+
+        // set the version to something that is not the next one-up version
+        snapshot.getSnapshotMetadata().setVersion(100);
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test
+    public void testCreateFirstSnapshot() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        // return a flow with the existing snapshot when getFlowById is called
+        final FlowMetadata existingFlow = new StandardFlowMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getFlowIdentifier())
+                .name("My Flow 1")
+                .description("This is my flow.")
+                .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowById(existingFlow.getIdentifier())).thenReturn(existingFlow);
+
+        final VersionedFlowSnapshot createdSnapshot = registryService.createFlowSnapshot(snapshot);
+        assertNotNull(createdSnapshot);
+
+        verify(snapshotSerializer, times(1)).serialize(eq(snapshot), any(OutputStream.class));
+        verify(flowPersistenceProvider, times(1)).saveSnapshot(any(), any());
+        verify(metadataProvider, times(1)).createFlowSnapshot(any(FlowSnapshotMetadata.class));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testCreateFirstSnapshotWithBadVersion() {
+        final VersionedFlowSnapshot snapshot = createSnapshot();
+
+        final BucketMetadata existingBucket = new StandardBucketMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .name("My Bucket #1")
+                .description("This is my bucket.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getBucketById(existingBucket.getIdentifier())).thenReturn(existingBucket);
+
+        // return a flow with the existing snapshot when getFlowById is called
+        final FlowMetadata existingFlow = new StandardFlowMetadata.Builder()
+                .identifier(snapshot.getSnapshotMetadata().getFlowIdentifier())
+                .name("My Flow 1")
+                .description("This is my flow.")
+                .bucketIdentifier(snapshot.getSnapshotMetadata().getBucketIdentifier())
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowById(existingFlow.getIdentifier())).thenReturn(existingFlow);
+
+        // set the first version to something other than 1
+        snapshot.getSnapshotMetadata().setVersion(100);
+        registryService.createFlowSnapshot(snapshot);
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testGetSnapshotDoesNotExistInMetadataProvider() {
+        final String flowId = "flow1";
+        final Integer version = 1;
+        when(metadataProvider.getFlowSnapshot(flowId, version)).thenReturn(null);
+        registryService.getFlowSnapshot(flowId, version);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testGetSnapshotDoesNotExistInPersistenceProvider() {
+        final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder()
+                .bucketIdentifier("b1")
+                .flowIdentifier("flow1")
+                .flowName("Flow 1")
+                .version(1)
+                .comments("This is snapshot 1")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowSnapshot(existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion()))
+                .thenReturn(existingSnapshot);
+
+        when(flowPersistenceProvider.getSnapshot(
+                existingSnapshot.getBucketIdentifier(),
+                existingSnapshot.getFlowIdentifier(),
+                existingSnapshot.getVersion()
+        )).thenReturn(null);
+
+        registryService.getFlowSnapshot(existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion());
+    }
+
+    @Test
+    public void testGetSnapshotExists() {
+        final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder()
+                .bucketIdentifier("b1")
+                .flowIdentifier("flow1")
+                .flowName("Flow 1")
+                .version(1)
+                .comments("This is snapshot 1")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowSnapshot(existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion()))
+                .thenReturn(existingSnapshot);
+
+        // return a non-null, non-zero-length array so something gets passed to the serializer
+        when(flowPersistenceProvider.getSnapshot(
+                existingSnapshot.getBucketIdentifier(),
+                existingSnapshot.getFlowIdentifier(),
+                existingSnapshot.getVersion()
+        )).thenReturn(new byte[10]);
+
+        final VersionedFlowSnapshot snapshotToDeserialize = createSnapshot();
+        when(snapshotSerializer.deserialize(any(InputStream.class))).thenReturn(snapshotToDeserialize);
+
+        final VersionedFlowSnapshot returnedSnapshot = registryService.getFlowSnapshot(
+                existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion());
+        assertNotNull(returnedSnapshot);
+    }
+
+    @Test(expected = ResourceNotFoundException.class)
+    public void testDeleteSnapshotDoesNotExist() {
+        final String flowId = "flow1";
+        final Integer version = 1;
+        when(metadataProvider.getFlowSnapshot(flowId, version)).thenReturn(null);
+        registryService.deleteFlowSnapshot(flowId, version);
+    }
+
+    @Test
+    public void testDeleteSnapshotExists() {
+        final FlowSnapshotMetadata existingSnapshot = new StandardFlowSnapshotMetadata.Builder()
+                .bucketIdentifier("b1")
+                .flowIdentifier("flow1")
+                .flowName("Flow 1")
+                .version(1)
+                .comments("This is snapshot 1")
+                .created(System.currentTimeMillis())
+                .build();
+
+        when(metadataProvider.getFlowSnapshot(existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion()))
+                .thenReturn(existingSnapshot);
+
+        final VersionedFlowSnapshotMetadata deletedSnapshot = registryService.deleteFlowSnapshot(
+                existingSnapshot.getFlowIdentifier(), existingSnapshot.getVersion());
+        assertNotNull(deletedSnapshot);
+        assertEquals(existingSnapshot.getFlowIdentifier(), deletedSnapshot.getFlowIdentifier());
+
+        verify(flowPersistenceProvider, times(1)).deleteSnapshot(
+                existingSnapshot.getBucketIdentifier(),
+                existingSnapshot.getFlowIdentifier(),
+                existingSnapshot.getVersion()
+        );
+
+        verify(metadataProvider, times(1)).deleteFlowSnapshot(
+                existingSnapshot.getFlowIdentifier(),
+                existingSnapshot.getVersion()
+        );
+    }
+
+    // -------------------------------------------------------------------
+
+    private Answer<BucketMetadata> createBucketAnswer() {
+        return (InvocationOnMock invocation) -> {
+            BucketMetadata bucketMetadata = (BucketMetadata) invocation.getArguments()[0];
+            return bucketMetadata;
+        };
+    }
+
+    private Answer<BucketMetadata> updateBucketAnswer() {
+        return (InvocationOnMock invocation) -> {
+            BucketMetadata bucketMetadata = (BucketMetadata) invocation.getArguments()[0];
+            return bucketMetadata;
+        };
+    }
+
+    private Answer<FlowMetadata> createFlowAnswer() {
+        return (InvocationOnMock invocation) -> {
+            final FlowMetadata flowMetadata = (FlowMetadata) invocation.getArguments()[1];
+            return flowMetadata;
+        };
+    }
+
+    private Answer<FlowMetadata> updateFlowAnswer() {
+        return (InvocationOnMock invocation) -> {
+            final FlowMetadata flowMetadata = (FlowMetadata) invocation.getArguments()[0];
+            return flowMetadata;
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
index 727aae0..3ddcadc 100644
--- a/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
+++ b/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/metadata/MetadataProvider.java
@@ -42,9 +42,16 @@ public interface MetadataProvider extends Provider {
      * @param bucketIdentifier the id of the bucket to retrieve
      * @return the bucket with the given id, or null if it does not exist
      */
-    BucketMetadata getBucket(String bucketIdentifier);
+    BucketMetadata getBucketById(String bucketIdentifier);
 
     /**
+     * Retrieves the bucket with the given name. The name comparison must be case-insensitive.
+     *
+     * @param name the name of the bucket to retrieve
+     * @return the bucket with the given name, or null if it does not exist
+     */
+    BucketMetadata getBucketByName(String name);
+    /**
      * Updates the given bucket, only the name and description should be allowed to be updated.
      *
      * @param bucket the updated bucket to save
@@ -53,7 +60,7 @@ public interface MetadataProvider extends Provider {
     BucketMetadata updateBucket(BucketMetadata bucket);
 
     /**
-     * Deletes the bucket with the given identifier if one exists.
+     * Deletes the bucket with the given identifier, as well as any objects that reference the bucket.
      *
      * @param bucketIdentifier the id of the bucket to delete
      */
@@ -82,7 +89,15 @@ public interface MetadataProvider extends Provider {
      * @param flowIdentifier the identifier of the flow to retrieve
      * @return the versioned flow with the given id, or null if no flow with the given id exists
      */
-    FlowMetadata getFlow(String flowIdentifier);
+    FlowMetadata getFlowById(String flowIdentifier);
+
+    /**
+     * Retrieves the versioned flow with the given name. The name comparison must be case-insensitive.
+     *
+     * @param name the name of the flow to retrieve
+     * @return the versioned flow with the given name, or null if no flow with the given name exists
+     */
+    FlowMetadata getFlowByName(String name);
 
     /**
      * Updates the given versioned flow, only the name and description should be allowed to be updated.

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
index 05ddd01..dd3e1ec 100644
--- a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
+++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/FileSystemMetadataProvider.java
@@ -62,7 +62,7 @@ public class FileSystemMetadataProvider implements MetadataProvider {
         try {
             return JAXBContext.newInstance(JAXB_GENERATED_PATH, FileSystemMetadataProvider.class.getClassLoader());
         } catch (JAXBException e) {
-            throw new RuntimeException("Unable to create JAXBContext.");
+            throw new RuntimeException("Unable to create JAXBContext.", e);
         }
     }
 
@@ -156,23 +156,33 @@ public class FileSystemMetadataProvider implements MetadataProvider {
         metadata.getBuckets().getBucket().add(jaxbBucket);
 
         saveAndRefresh(metadata);
-        return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier());
+        return metadataHolder.get().getBucketsById().get(bucket.getIdentifier());
     }
 
     @Override
-    public BucketMetadata getBucket(final String bucketIdentifier) {
+    public BucketMetadata getBucketById(final String bucketIdentifier) {
         if (bucketIdentifier == null) {
             throw new IllegalArgumentException("Bucket Identifier cannot be null");
         }
 
         final MetadataHolder holder = metadataHolder.get();
-        return holder.getBucketsBydId().get(bucketIdentifier);
+        return holder.getBucketsById().get(bucketIdentifier);
+    }
+
+    @Override
+    public BucketMetadata getBucketByName(String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("Bucket Name cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        return holder.getBucketsByName().get(name.toLowerCase());
     }
 
     @Override
     public Set<BucketMetadata> getBuckets() {
         final MetadataHolder holder = metadataHolder.get();
-        final Map<String,BucketMetadata> bucketsBydId = holder.getBucketsBydId();
+        final Map<String,BucketMetadata> bucketsBydId = holder.getBucketsById();
         return new HashSet<>(bucketsBydId.values());
     }
 
@@ -198,7 +208,7 @@ public class FileSystemMetadataProvider implements MetadataProvider {
         jaxbBucket.setDescription(bucket.getDescription());
 
         saveAndRefresh(holder.getMetadata());
-        return metadataHolder.get().getBucketsBydId().get(bucket.getIdentifier());
+        return metadataHolder.get().getBucketsById().get(bucket.getIdentifier());
     }
 
     @Override
@@ -251,7 +261,7 @@ public class FileSystemMetadataProvider implements MetadataProvider {
 
         final MetadataHolder holder = metadataHolder.get();
 
-        final BucketMetadata bucket = holder.getBucketsBydId().get(bucketIdentifier);
+        final BucketMetadata bucket = holder.getBucketsById().get(bucketIdentifier);
         if (bucket == null) {
             throw new IllegalStateException("Unable to create Versioned Flow because Bucket does not exist with id " + bucketIdentifier);
         }
@@ -272,7 +282,7 @@ public class FileSystemMetadataProvider implements MetadataProvider {
     }
 
     @Override
-    public FlowMetadata getFlow(final String flowIdentifier) {
+    public FlowMetadata getFlowById(final String flowIdentifier) {
         if (flowIdentifier == null) {
             throw new IllegalArgumentException("Flow Identifier cannot be null");
         }
@@ -282,6 +292,17 @@ public class FileSystemMetadataProvider implements MetadataProvider {
     }
 
     @Override
+    public FlowMetadata getFlowByName(final String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("Flow Name cannot be null");
+        }
+
+        final MetadataHolder holder = metadataHolder.get();
+        return holder.getFlowsByName().get(name.toLowerCase());
+    }
+
+
+    @Override
     public Set<FlowMetadata> getFlows() {
         final MetadataHolder holder = metadataHolder.get();
         final Map<String,FlowMetadata> flowsById = holder.getFlowsById();

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java
index 4264b01..e49a62b 100644
--- a/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java
+++ b/nifi-registry-provider-impl/src/main/java/org/apache/nifi/registry/metadata/MetadataHolder.java
@@ -36,13 +36,17 @@ public class MetadataHolder {
     private final Metadata metadata;
     private final Map<String,Set<FlowMetadata>> flowsByBucket;
     private final Map<String,FlowMetadata> flowsById;
+    private final Map<String,FlowMetadata> flowsByName;
     private final Map<String,BucketMetadata> bucketsById;
+    private final Map<String,BucketMetadata> bucketsByName;
 
     public MetadataHolder(final Metadata metadata) {
         this.metadata = metadata;
         this.flowsByBucket = Collections.unmodifiableMap(createFlowsByBucket(metadata));
+        this.flowsByName = Collections.unmodifiableMap(createFlowsByName(flowsByBucket));
         this.flowsById = Collections.unmodifiableMap(createFlowsById(flowsByBucket));
         this.bucketsById = Collections.unmodifiableMap(createBucketsBydId(metadata, flowsByBucket));
+        this.bucketsByName = Collections.unmodifiableMap(createBucketsByName(bucketsById));
     }
 
     private Map<String,BucketMetadata> createBucketsBydId(final Metadata metadata, final Map<String,Set<FlowMetadata>> flowsByBucket) {
@@ -60,6 +64,12 @@ public class MetadataHolder {
         return bucketsById;
     }
 
+    private Map<String,BucketMetadata> createBucketsByName(Map<String,BucketMetadata> bucketsById) {
+        final Map<String,BucketMetadata> bucketsByName = new HashMap<>();
+        bucketsById.values().stream().forEach(b -> bucketsByName.put(b.getName().toLowerCase(), b));
+        return bucketsByName;
+    }
+
     private BucketMetadata createBucketMetadata(final Bucket jaxbBucket, final Set<FlowMetadata> bucketFlows) {
         return new StandardBucketMetadata.Builder()
                 .identifier(jaxbBucket.getIdentifier())
@@ -116,29 +126,49 @@ public class MetadataHolder {
     }
 
     private Map<String,FlowMetadata> createFlowsById(final Map<String,Set<FlowMetadata>> flowsByBucket) {
-        final Map<String,FlowMetadata> flowsBdId = new HashMap<>();
+        final Map<String,FlowMetadata> flowsById = new HashMap<>();
 
         for (final Map.Entry<String,Set<FlowMetadata>> entry : flowsByBucket.entrySet()) {
             for (final FlowMetadata flowMetadata : entry.getValue()) {
-                flowsBdId.put(flowMetadata.getIdentifier(), flowMetadata);
+                flowsById.put(flowMetadata.getIdentifier(), flowMetadata);
+            }
+        }
+
+        return flowsById;
+    }
+
+    private Map<String,FlowMetadata> createFlowsByName(final Map<String,Set<FlowMetadata>> flowsByBucket) {
+        final Map<String,FlowMetadata> flowsByName = new HashMap<>();
+
+        for (final Map.Entry<String,Set<FlowMetadata>> entry : flowsByBucket.entrySet()) {
+            for (final FlowMetadata flow : entry.getValue()) {
+                flowsByName.put(flow.getName().toLowerCase(), flow);
             }
         }
 
-        return flowsBdId;
+        return flowsByName;
     }
 
     public Metadata getMetadata() {
         return metadata;
     }
 
-    public Map<String,BucketMetadata> getBucketsBydId() {
+    public Map<String,BucketMetadata> getBucketsById() {
         return bucketsById;
     }
 
+    public Map<String,BucketMetadata> getBucketsByName() {
+        return bucketsByName;
+    }
+
     public Map<String,FlowMetadata> getFlowsById() {
         return flowsById;
     }
 
+    public Map<String,FlowMetadata> getFlowsByName() {
+        return flowsByName;
+    }
+
     public Map<String,Set<FlowMetadata>> getFlowsByBucket() {
         return flowsByBucket;
     }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java b/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java
index 753ca6e..5c823b6 100644
--- a/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java
+++ b/nifi-registry-provider-impl/src/test/java/org/apache/nifi/registry/metadata/TestFileSystemMetadataProvider.java
@@ -111,15 +111,15 @@ public class TestFileSystemMetadataProvider {
         assertEquals(2, metadataProvider.getBuckets().size());
         assertEquals(1, metadataProvider.getFlows().size());
 
-        final BucketMetadata bucket1 = metadataProvider.getBucket("bucket1");
+        final BucketMetadata bucket1 = metadataProvider.getBucketById("bucket1");
         assertNotNull(bucket1);
         assertEquals("bucket1", bucket1.getIdentifier());
 
-        final BucketMetadata bucket2 = metadataProvider.getBucket("bucket2");
+        final BucketMetadata bucket2 = metadataProvider.getBucketById("bucket2");
         assertNotNull(bucket2);
         assertEquals("bucket2", bucket2.getIdentifier());
 
-        final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1");
+        final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1");
         assertNotNull(flowMetadata);
         assertEquals("flow1", flowMetadata.getIdentifier());
     }
@@ -150,11 +150,11 @@ public class TestFileSystemMetadataProvider {
     }
 
     @Test
-    public void testGetBucketExists() {
+    public void testGetBucketByIdExists() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
         assertEquals(2, metadataProvider.getBuckets().size());
 
-        final BucketMetadata bucket1 = metadataProvider.getBucket("bucket1");
+        final BucketMetadata bucket1 = metadataProvider.getBucketById("bucket1");
         assertNotNull(bucket1);
         assertEquals("bucket1", bucket1.getIdentifier());
         assertEquals("Bryan's Bucket", bucket1.getName());
@@ -163,11 +163,46 @@ public class TestFileSystemMetadataProvider {
     }
 
     @Test
-    public void testGetBucketDoesNotExist() {
+    public void testGetBucketByIdDoesNotExist() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
         assertEquals(2, metadataProvider.getBuckets().size());
 
-        final BucketMetadata bucket1 = metadataProvider.getBucket("bucket-does-not-exist");
+        final BucketMetadata bucket1 = metadataProvider.getBucketById("bucket-does-not-exist");
+        assertNull(bucket1);
+    }
+
+    @Test
+    public void testGetBucketByNameExists() {
+        metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
+        assertEquals(2, metadataProvider.getBuckets().size());
+
+        final BucketMetadata bucket1 = metadataProvider.getBucketByName("Bryan's Bucket");
+        assertNotNull(bucket1);
+        assertEquals("bucket1", bucket1.getIdentifier());
+        assertEquals("Bryan's Bucket", bucket1.getName());
+        assertEquals("The description for Bryan's Bucket.", bucket1.getDescription());
+        assertEquals(111111111, bucket1.getCreatedTimestamp());
+    }
+
+    @Test
+    public void testGetBucketByNameCaseInsensitive() {
+        metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
+        assertEquals(2, metadataProvider.getBuckets().size());
+
+        final BucketMetadata bucket1 = metadataProvider.getBucketByName("bryan's bucket");
+        assertNotNull(bucket1);
+        assertEquals("bucket1", bucket1.getIdentifier());
+        assertEquals("Bryan's Bucket", bucket1.getName());
+        assertEquals("The description for Bryan's Bucket.", bucket1.getDescription());
+        assertEquals(111111111, bucket1.getCreatedTimestamp());
+    }
+
+    @Test
+    public void testGetBucketByNameDoesNotExist() {
+        metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
+        assertEquals(2, metadataProvider.getBuckets().size());
+
+        final BucketMetadata bucket1 = metadataProvider.getBucketByName("bucket-does-not-exist");
         assertNull(bucket1);
     }
 
@@ -176,7 +211,7 @@ public class TestFileSystemMetadataProvider {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
         assertEquals(2, metadataProvider.getBuckets().size());
 
-        final BucketMetadata bucket = metadataProvider.getBucket("bucket1");
+        final BucketMetadata bucket = metadataProvider.getBucketById("bucket1");
         assertNotNull(bucket);
 
         final BucketMetadata updatedBucket = new StandardBucketMetadata.Builder(bucket)
@@ -212,14 +247,14 @@ public class TestFileSystemMetadataProvider {
         assertEquals(2, metadataProvider.getBuckets().size());
 
         final String bucketId = "bucket1";
-        assertNotNull(metadataProvider.getBucket(bucketId));
+        assertNotNull(metadataProvider.getBucketById(bucketId));
 
         final Set<FlowMetadata> bucketFlows = metadataProvider.getFlows(bucketId);
         assertNotNull(bucketFlows);
         assertEquals(1, bucketFlows.size());
 
         metadataProvider.deleteBucket(bucketId);
-        assertNull(metadataProvider.getBucket(bucketId));
+        assertNull(metadataProvider.getBucketById(bucketId));
 
         final Set<FlowMetadata> bucketFlows2 = metadataProvider.getFlows(bucketId);
         assertNotNull(bucketFlows2);
@@ -243,7 +278,7 @@ public class TestFileSystemMetadataProvider {
         assertEquals(2, metadataProvider.getBuckets().size());
 
         // verify bucket2 exists and has no flows
-        final BucketMetadata bucket2 = metadataProvider.getBucket("bucket2");
+        final BucketMetadata bucket2 = metadataProvider.getBucketById("bucket2");
         assertNotNull(bucket2);
         assertEquals(0, metadataProvider.getFlows(bucket2.getIdentifier()).size());
 
@@ -268,10 +303,10 @@ public class TestFileSystemMetadataProvider {
     }
 
     @Test
-    public void testGetFlowExists() {
+    public void testGetFlowByIdExists() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
 
-        final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1");
+        final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1");
         assertNotNull(flowMetadata);
         assertEquals("flow1", flowMetadata.getIdentifier());
         assertEquals("Bryan's Flow", flowMetadata.getName());
@@ -283,10 +318,46 @@ public class TestFileSystemMetadataProvider {
     }
 
     @Test
-    public void testGetFlowDoesNotExist() {
+    public void testGetFlowByIdDoesNotExist() {
+        metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
+
+        final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow-does-not-exist");
+        assertNull(flowMetadata);
+    }
+
+    @Test
+    public void testGetFlowByNameExists() {
+        metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
+
+        final FlowMetadata flowMetadata = metadataProvider.getFlowByName("Bryan's Flow");
+        assertNotNull(flowMetadata);
+        assertEquals("flow1", flowMetadata.getIdentifier());
+        assertEquals("Bryan's Flow", flowMetadata.getName());
+        assertEquals("The description for Bryan's Flow.", flowMetadata.getDescription());
+        assertEquals(333333333, flowMetadata.getCreatedTimestamp());
+        assertEquals(444444444, flowMetadata.getModifiedTimestamp());
+        assertEquals(3, flowMetadata.getSnapshotMetadata().size());
+    }
+
+    @Test
+    public void testGetFlowByNameCaseInsensitive() {
+        metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
+
+        final FlowMetadata flowMetadata = metadataProvider.getFlowByName("bryan's flow");
+        assertNotNull(flowMetadata);
+        assertEquals("flow1", flowMetadata.getIdentifier());
+        assertEquals("Bryan's Flow", flowMetadata.getName());
+        assertEquals("The description for Bryan's Flow.", flowMetadata.getDescription());
+        assertEquals(333333333, flowMetadata.getCreatedTimestamp());
+        assertEquals(444444444, flowMetadata.getModifiedTimestamp());
+        assertEquals(3, flowMetadata.getSnapshotMetadata().size());
+    }
+
+    @Test
+    public void testGetFlowByNameDoesNotExist() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
 
-        final FlowMetadata flowMetadata = metadataProvider.getFlow("flow-does-not-exist");
+        final FlowMetadata flowMetadata = metadataProvider.getFlowByName("flow-does-not-exist");
         assertNull(flowMetadata);
     }
 
@@ -294,7 +365,7 @@ public class TestFileSystemMetadataProvider {
     public void testUpdateFlowExists() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
 
-        final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1");
+        final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1");
         assertNotNull(flowMetadata);
 
         final String newFlowName = "New Flow Name";
@@ -334,14 +405,14 @@ public class TestFileSystemMetadataProvider {
     public void testDeleteFlowWithSnapshots() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
 
-        final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1");
+        final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1");
         assertNotNull(flowMetadata);
         assertNotNull(flowMetadata.getSnapshotMetadata());
         assertTrue(flowMetadata.getSnapshotMetadata().size() > 0);
 
         metadataProvider.deleteFlow(flowMetadata.getIdentifier());
 
-        final FlowMetadata deletedFlow = metadataProvider.getFlow("flow1");
+        final FlowMetadata deletedFlow = metadataProvider.getFlowById("flow1");
         assertNull(deletedFlow);
     }
 
@@ -358,7 +429,7 @@ public class TestFileSystemMetadataProvider {
     public void testCreateFlowSnapshot() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
 
-        final FlowMetadata versionedFlow = metadataProvider.getFlow("flow1");
+        final FlowMetadata versionedFlow = metadataProvider.getFlowById("flow1");
         assertNotNull(versionedFlow);
         assertNotNull(versionedFlow.getSnapshotMetadata());
 
@@ -384,7 +455,7 @@ public class TestFileSystemMetadataProvider {
         assertEquals(nextSnapshot.getComments(), createdSnapshot.getComments());
         assertEquals(nextSnapshot.getCreatedTimestamp(), createdSnapshot.getCreatedTimestamp());
 
-        final FlowMetadata updatedFlow = metadataProvider.getFlow("flow1");
+        final FlowMetadata updatedFlow = metadataProvider.getFlowById("flow1");
         assertNotNull(updatedFlow);
         assertNotNull(updatedFlow.getSnapshotMetadata());
         assertEquals(updatedFlow.getSnapshotMetadata().size(), versionedFlow.getSnapshotMetadata().size() + 1);
@@ -439,7 +510,7 @@ public class TestFileSystemMetadataProvider {
     public void testDeleteSnapshotExists() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
 
-        final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1");
+        final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1");
         assertNotNull(flowMetadata);
         assertNotNull(flowMetadata.getSnapshotMetadata());
         assertEquals(3, flowMetadata.getSnapshotMetadata().size());
@@ -449,7 +520,7 @@ public class TestFileSystemMetadataProvider {
 
         metadataProvider.deleteFlowSnapshot(flowMetadata.getIdentifier(), firstSnapshot.getVersion());
 
-        final FlowMetadata updatedFlow = metadataProvider.getFlow("flow1");
+        final FlowMetadata updatedFlow = metadataProvider.getFlowById("flow1");
         assertNotNull(updatedFlow);
         assertNotNull(updatedFlow.getSnapshotMetadata());
         assertEquals(2, updatedFlow.getSnapshotMetadata().size());
@@ -463,14 +534,14 @@ public class TestFileSystemMetadataProvider {
     public void testDeleteSnapshotDoesNotExist() {
         metadataProvider.onConfigured(createConfigContext(METADATA_DEST_EXISTING));
 
-        final FlowMetadata flowMetadata = metadataProvider.getFlow("flow1");
+        final FlowMetadata flowMetadata = metadataProvider.getFlowById("flow1");
         assertNotNull(flowMetadata);
         assertNotNull(flowMetadata.getSnapshotMetadata());
         assertEquals(3, flowMetadata.getSnapshotMetadata().size());
 
         metadataProvider.deleteFlowSnapshot(flowMetadata.getIdentifier(), Integer.MAX_VALUE);
 
-        final FlowMetadata updatedFlow = metadataProvider.getFlow("flow1");
+        final FlowMetadata updatedFlow = metadataProvider.getFlowById("flow1");
         assertNotNull(updatedFlow);
         assertNotNull(updatedFlow.getSnapshotMetadata());
         assertEquals(3, updatedFlow.getSnapshotMetadata().size());

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/pom.xml b/nifi-registry-web-api/pom.xml
index b1a7f07..74c6ba3 100644
--- a/nifi-registry-web-api/pom.xml
+++ b/nifi-registry-web-api/pom.xml
@@ -85,11 +85,6 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi.registry</groupId>
-            <artifactId>nifi-registry-data-model</artifactId>
-            <version>0.0.1-SNAPSHOT</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.nifi.registry</groupId>
             <artifactId>nifi-registry-properties</artifactId>
             <version>0.0.1-SNAPSHOT</version>
             <scope>provided</scope>
@@ -126,9 +121,12 @@
             <artifactId>jersey-media-json-jackson</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.glassfish.jersey.ext</groupId>
+            <artifactId>jersey-bean-validation</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.swagger</groupId>
             <artifactId>swagger-annotations</artifactId>
-            <version>1.5.16</version>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java
index e2fabdc..02ece9d 100644
--- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java
+++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/NiFiRegistryResourceConfig.java
@@ -17,19 +17,32 @@
 package org.apache.nifi.registry.web;
 
 import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
 import org.apache.nifi.registry.metadata.MetadataProvider;
 import org.apache.nifi.registry.properties.NiFiRegistryProperties;
 import org.apache.nifi.registry.provider.ProviderFactory;
 import org.apache.nifi.registry.provider.StandardProviderFactory;
+import org.apache.nifi.registry.serialization.FlowSnapshotSerializer;
+import org.apache.nifi.registry.serialization.Serializer;
+import org.apache.nifi.registry.service.RegistryService;
+import org.apache.nifi.registry.web.api.BucketFlowResource;
+import org.apache.nifi.registry.web.api.BucketResource;
+import org.apache.nifi.registry.web.api.FlowResource;
 import org.apache.nifi.registry.web.api.TestResource;
 import org.apache.nifi.registry.web.mapper.IllegalArgumentExceptionMapper;
+import org.apache.nifi.registry.web.mapper.IllegalStateExceptionMapper;
+import org.apache.nifi.registry.web.mapper.ResourceNotFoundExceptionMapper;
 import org.apache.nifi.registry.web.mapper.ThrowableMapper;
 import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.server.ServerProperties;
 import org.glassfish.jersey.server.filter.HttpMethodOverrideFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.servlet.ServletContext;
+import javax.validation.Validation;
+import javax.validation.Validator;
+import javax.validation.ValidatorFactory;
 import javax.ws.rs.core.Context;
 
 public class NiFiRegistryResourceConfig extends ResourceConfig {
@@ -39,17 +52,37 @@ public class NiFiRegistryResourceConfig extends ResourceConfig {
     public NiFiRegistryResourceConfig(@Context ServletContext servletContext) {
         final NiFiRegistryProperties properties = (NiFiRegistryProperties) servletContext.getAttribute("nifi-registry.properties");
 
+        // create the providers
         final ProviderFactory providerFactory = new StandardProviderFactory(properties);
+        providerFactory.initialize();
+
         final MetadataProvider metadataProvider = providerFactory.getMetadataProvider();
         final FlowPersistenceProvider flowPersistenceProvider = providerFactory.getFlowPersistenceProvider();
 
+        // create a serializer for flow snapshots
+        final Serializer<VersionedFlowSnapshot> snapshotSerializer = new FlowSnapshotSerializer();
+
+        // create a validator
+        final ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
+        final Validator validator = validatorFactory.getValidator();
+
+        // create the main services that the REST resources will use
+        final RegistryService registryService = new RegistryService(metadataProvider, flowPersistenceProvider, snapshotSerializer, validator);
+
         register(HttpMethodOverrideFilter.class);
 
         // register the exception mappers
         register(new IllegalArgumentExceptionMapper());
+        register(new IllegalStateExceptionMapper());
+        register(new ResourceNotFoundExceptionMapper());
         register(new ThrowableMapper());
 
         // register endpoints
         register(new TestResource(metadataProvider, flowPersistenceProvider));
+        register(new BucketResource(registryService));
+        register(new BucketFlowResource(registryService));
+        register(new FlowResource(registryService));
+
+        property(ServerProperties.BV_SEND_ERROR_IN_RESPONSE, true);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java
index fc838fd..218205b 100644
--- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java
+++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java
@@ -19,6 +19,7 @@ package org.apache.nifi.registry.web.api;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.service.RegistryService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +40,12 @@ public class BucketFlowResource {
 
     private static final Logger logger = LoggerFactory.getLogger(BucketFlowResource.class);
 
+    private final RegistryService registryService;
+
+    public BucketFlowResource(final RegistryService registryService) {
+        this.registryService = registryService;
+    }
+
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
@@ -47,10 +54,9 @@ public class BucketFlowResource {
                     "The flow id is created by the server and a location URI for the created flow resource is returned.",
             response = VersionedFlow.class
     )
-    public Response createFlow(@PathParam("bucketId") String bucketId) {
-        // TODO implement createFlow
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response createFlow(@PathParam("bucketId") final String bucketId, final VersionedFlow flow) {
+        final VersionedFlow createdFlow = registryService.createFlow(bucketId, flow);
+        return Response.status(Response.Status.OK).entity(createdFlow).build();
     }
 
     /* TODO, add redirection URIs so that GET, PUT, DELETE operations for a given flow id (once created)

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java
----------------------------------------------------------------------
diff --git a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java
index 25f62f5..797ea9e 100644
--- a/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java
+++ b/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java
@@ -20,7 +20,9 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.service.RegistryService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,6 +38,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
+import java.util.Set;
 
 @Path("/buckets")
 @Api(
@@ -50,6 +53,12 @@ public class BucketResource {
     @Context
     UriInfo uriInfo;
 
+    private final RegistryService registryService;
+
+    public BucketResource(final RegistryService registryService) {
+        this.registryService = registryService;
+    }
+
     @POST
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
@@ -57,10 +66,9 @@ public class BucketResource {
             value = "Create a named bucket capable of storing NiFi bucket objects such as flows and extension bundles.",
             response = Bucket.class
     )
-    public Response createBucket(Bucket bucket) {
-        // TODO implement createBucket
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response createBucket(final Bucket bucket) {
+        final Bucket createdBucket = registryService.createBucket(bucket);
+        return Response.status(Response.Status.OK).entity(createdBucket).build();
     }
 
     @GET
@@ -72,9 +80,8 @@ public class BucketResource {
             responseContainer = "List"
     )
     public Response getBuckets() {
-        // TODO implement getBuckets
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+        final Set<Bucket> buckets = registryService.getBuckets();
+        return Response.status(Response.Status.OK).entity(buckets).build();
     }
 
     @GET
@@ -90,10 +97,9 @@ public class BucketResource {
                     @ApiResponse(code = 404, message = "The specified resource could not be found."),
             }
     )
-    public Response getBucket(@PathParam("bucketId") String bucketId) {
-        // TODO implement getBucket
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response getBucket(@PathParam("bucketId") final String bucketId) {
+        final Bucket bucket = registryService.getBucket(bucketId);
+        return Response.status(Response.Status.OK).entity(bucket).build();
     }
 
     @PUT
@@ -109,10 +115,25 @@ public class BucketResource {
                     @ApiResponse(code = 404, message = "The specified resource could not be found."),
             }
     )
-    public Response updateBucket(@PathParam("bucketId") String bucketId) {
-        // TODO implement updateBucket
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response updateBucket(@PathParam("bucketId") final String bucketId, final Bucket bucket) {
+        if (StringUtils.isBlank(bucketId)) {
+            throw new IllegalArgumentException("Bucket Id cannot be blank");
+        }
+
+        if (bucket == null) {
+            throw new IllegalArgumentException("Bucket cannot be null");
+        }
+
+        if (bucket.getIdentifier() != null && !bucketId.equals(bucket.getIdentifier())) {
+            throw new IllegalArgumentException("Bucket id in path param must match bucket id in body");
+        }
+
+        if (bucket.getIdentifier() == null) {
+            bucket.setIdentifier(bucketId);
+        }
+
+        final Bucket updatedBucket = registryService.updateBucket(bucket);
+        return Response.status(Response.Status.OK).entity(updatedBucket).build();
     }
 
     @DELETE
@@ -128,10 +149,9 @@ public class BucketResource {
                     @ApiResponse(code = 404, message = "The specified resource could not be found."),
             }
     )
-    public Response deleteBucket(@PathParam("bucketId") String bucketId) {
-        // TODO implement deleteBucket
-        logger.error("This API functionality has not yet been implemented.");
-        return Response.status(Response.Status.NOT_IMPLEMENTED).build();
+    public Response deleteBucket(@PathParam("bucketId") final String bucketId) {
+        final Bucket deletedBucket = registryService.deleteBucket(bucketId);
+        return Response.status(Response.Status.OK).entity(deletedBucket).build();
     }
 
 }


[3/3] nifi-registry git commit: NIFIREG-10 Implementing Registry service layer and connecting to REST end-points - Adding FlowSnapshotSerializer and JAXBFlowSnapshotSerializer - Adding bean validation 2.0.0 using Hibernate validator as the implementation

Posted by bb...@apache.org.
NIFIREG-10 Implementing Registry service layer and connecting to REST end-points
- Adding FlowSnapshotSerializer and JAXBFlowSnapshotSerializer
- Adding bean validation 2.0.0 using Hibernate validator as the implementation
- Adding a test that can be run manual to exercise some of the REST API

This closes #7.


Project: http://git-wip-us.apache.org/repos/asf/nifi-registry/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-registry/commit/a1629c86
Tree: http://git-wip-us.apache.org/repos/asf/nifi-registry/tree/a1629c86
Diff: http://git-wip-us.apache.org/repos/asf/nifi-registry/diff/a1629c86

Branch: refs/heads/master
Commit: a1629c86df62930f603f264495f63a2429af7f1a
Parents: 88cae4f
Author: Bryan Bende <bb...@apache.org>
Authored: Tue Aug 22 16:15:39 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Fri Sep 1 11:06:49 2017 -0400

----------------------------------------------------------------------
 build-and-run.sh                                |  28 +
 nifi-registry-data-model/pom.xml                |   5 +-
 .../org/apache/nifi/registry/bucket/Bucket.java |  16 +-
 .../apache/nifi/registry/bucket/BucketItem.java |  27 +-
 .../nifi/registry/flow/VersionedComponent.java  |   4 +
 .../nifi/registry/flow/VersionedFlow.java       |  12 +-
 .../registry/flow/VersionedFlowSnapshot.java    |  10 +
 .../flow/VersionedFlowSnapshotMetadata.java     |  13 +
 nifi-registry-framework/pom.xml                 |  19 +
 .../exception/ResourceNotFoundException.java    |  32 +
 .../flow/StandardFlowSnapshotContext.java       |  15 +
 .../provider/StandardProviderFactory.java       |   2 +-
 .../serialization/FlowSnapshotSerializer.java   | 101 ++
 .../serialization/SerializationException.java   |  35 +
 .../nifi/registry/serialization/Serializer.java |  43 +
 .../jaxb/JAXBFlowSnapshotSerializer.java        |  30 +
 .../serialization/jaxb/JAXBSerializer.java      |  80 ++
 .../nifi/registry/service/DataModelMapper.java  | 125 +++
 .../nifi/registry/service/RegistryService.java  | 511 ++++++++++
 .../registry/provider/MockMetadataProvider.java |  14 +-
 .../TestFlowSnapshotSerializer.java             |  66 ++
 .../jaxb/TestJAXBFlowSnapshotSerializer.java    |  63 ++
 .../registry/service/TestDataModelMapper.java   | 174 ++++
 .../registry/service/TestRegistryService.java   | 936 +++++++++++++++++++
 .../registry/metadata/MetadataProvider.java     |  21 +-
 .../metadata/FileSystemMetadataProvider.java    |  37 +-
 .../nifi/registry/metadata/MetadataHolder.java  |  38 +-
 .../TestFileSystemMetadataProvider.java         | 119 ++-
 nifi-registry-web-api/pom.xml                   |  10 +-
 .../web/NiFiRegistryResourceConfig.java         |  33 +
 .../registry/web/api/BucketFlowResource.java    |  14 +-
 .../nifi/registry/web/api/BucketResource.java   |  58 +-
 .../nifi/registry/web/api/FlowResource.java     | 121 ++-
 .../web/mapper/IllegalStateExceptionMapper.java |  44 +
 .../mapper/ResourceNotFoundExceptionMapper.java |  45 +
 .../mapper/SerializationExceptionMapper.java    |  45 +
 .../apache/nifi/registry/web/TestRestAPI.java   | 152 +++
 pom.xml                                         |  37 +
 38 files changed, 3014 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/build-and-run.sh
----------------------------------------------------------------------
diff --git a/build-and-run.sh b/build-and-run.sh
new file mode 100755
index 0000000..7c7d3d6
--- /dev/null
+++ b/build-and-run.sh
@@ -0,0 +1,28 @@
+#!/bin/sh
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+REGISTRY_SCRIPT=`find nifi-registry-assembly/target/ -name nifi-registry.sh | head -1`
+REGISTRY_BIN_DIR=$(dirname "${REGISTRY_SCRIPT}")
+REGISTRY_DIR=$REGISTRY_BIN_DIR/..
+
+./${REGISTRY_SCRIPT} stop
+
+mvn clean install -Pcontrib-check
+
+./${REGISTRY_SCRIPT} start
+
+tail -n 500 -f ${REGISTRY_DIR}/logs/nifi-registry-app.log

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/pom.xml b/nifi-registry-data-model/pom.xml
index dbdda37..438fe01 100644
--- a/nifi-registry-data-model/pom.xml
+++ b/nifi-registry-data-model/pom.xml
@@ -25,7 +25,10 @@
         <dependency>
             <groupId>io.swagger</groupId>
             <artifactId>swagger-annotations</artifactId>
-            <version>1.5.16</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.validation</groupId>
+            <artifactId>validation-api</artifactId>
         </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java
index 2797213..9928cd8 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/Bucket.java
@@ -20,18 +20,32 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import org.apache.nifi.registry.flow.VersionedFlow;
 
+import javax.validation.Valid;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotBlank;
+import javax.xml.bind.annotation.XmlRootElement;
 import java.util.Objects;
 import java.util.Set;
 
+@XmlRootElement
 @ApiModel(value = "bucket")
 public class Bucket {
 
+    @NotBlank
     private String identifier;
+
+    @NotBlank
     private String name;
+
+    @Min(1)
     private long createdTimestamp;
+
     private String description;
+
+    @Valid
     private Set<VersionedFlow> versionedFlows;
 
+
     @ApiModelProperty("The id of the bucket. This is set by the server at creation time.")
     public String getIdentifier() {
         return identifier;
@@ -68,7 +82,7 @@ public class Bucket {
         this.description = description;
     }
 
-    @ApiModelProperty("The versioned flows in the bucket.")
+    @ApiModelProperty(value = "The versioned flows in the bucket.", readOnly = true)
     public Set<VersionedFlow> getVersionedFlows() {
         return versionedFlows;
     }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java
index eec4fee..d57f07b 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/bucket/BucketItem.java
@@ -19,17 +19,36 @@ package org.apache.nifi.registry.bucket;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
 import java.util.Objects;
 
 @ApiModel("bucketItem")
-public class BucketItem {
+public abstract class BucketItem {
 
+    @NotBlank
     private String identifier;
+
+    @NotBlank
     private String name;
+
+    @NotBlank
     private String bucketIdentifier;
+
+    @Min(1)
     private long createdTimestamp;
+
+    @Min(1)
     private long modifiedTimestamp;
-    private BucketItemType type;
+
+    @NotNull
+    private final BucketItemType type;
+
+    public BucketItem(final BucketItemType type) {
+        this.type = type;
+    }
+
 
     @ApiModelProperty("An ID to uniquely identify this object.")
     public String getIdentifier() {
@@ -81,10 +100,6 @@ public class BucketItem {
         return type;
     }
 
-    public void setType(BucketItemType type) {
-        this.type = type;
-    }
-
     @Override
     public int hashCode() {
         return Objects.hashCode(this.identifier);

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java
index bef9557..af9b3d7 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedComponent.java
@@ -64,4 +64,8 @@ public abstract class VersionedComponent {
     }
 
     public abstract ComponentType getComponentType();
+
+    public void setComponentType(ComponentType type) {
+        // purposely do nothing here, this just to allow unmarshalling
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java
index d1b1108..8bbb040 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlow.java
@@ -19,7 +19,10 @@ package org.apache.nifi.registry.flow;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import org.apache.nifi.registry.bucket.BucketItem;
+import org.apache.nifi.registry.bucket.BucketItemType;
 
+import javax.validation.Valid;
+import javax.xml.bind.annotation.XmlRootElement;
 import java.util.SortedSet;
 
 /**
@@ -31,12 +34,19 @@ import java.util.SortedSet;
  *
  * @see VersionedFlowSnapshot
  */
+@XmlRootElement
 @ApiModel(value = "versionedFlow")
 public class VersionedFlow extends BucketItem {
 
     private String description;
+
+    @Valid
     private SortedSet<VersionedFlowSnapshotMetadata> snapshotMetadata;
 
+    public VersionedFlow() {
+        super(BucketItemType.FLOW);
+    }
+
     @ApiModelProperty("A description of the flow.")
     public String getDescription() {
         return description;
@@ -46,7 +56,7 @@ public class VersionedFlow extends BucketItem {
         this.description = description;
     }
 
-    @ApiModelProperty("The metadata for each snapshot of this flow.")
+    @ApiModelProperty(value = "The metadata for each snapshot of this flow.", readOnly = true)
     public SortedSet<VersionedFlowSnapshotMetadata> getSnapshotMetadata() {
         return snapshotMetadata;
     }

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
index c3b368c..9f5d973 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshot.java
@@ -20,6 +20,9 @@ package org.apache.nifi.registry.flow;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
+import javax.validation.Valid;
+import javax.validation.constraints.NotNull;
+import javax.xml.bind.annotation.XmlRootElement;
 import java.util.Objects;
 
 /**
@@ -30,12 +33,19 @@ import java.util.Objects;
  * version of the flow, the timestamp when it was saved, the contents of the flow, etc.
  * </p>
  */
+@XmlRootElement
 @ApiModel(value = "versionedFlowSnapshot")
 public class VersionedFlowSnapshot {
 
+    @Valid
+    @NotNull
     private VersionedFlowSnapshotMetadata snapshotMetadata;
+
+    @Valid
+    @NotNull
     private VersionedProcessGroup flowContents;
 
+
     @ApiModelProperty("The metadata for this snapshot")
     public VersionedFlowSnapshotMetadata getSnapshotMetadata() {
         return snapshotMetadata;

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java
----------------------------------------------------------------------
diff --git a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java
index 4cd3de9..e60dcfd 100644
--- a/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java
+++ b/nifi-registry-data-model/src/main/java/org/apache/nifi/registry/flow/VersionedFlowSnapshotMetadata.java
@@ -19,6 +19,8 @@ package org.apache.nifi.registry.flow;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotBlank;
 import java.util.Objects;
 
 /**
@@ -28,13 +30,24 @@ import java.util.Objects;
 @ApiModel(value = "versionedFlowSnapshot")
 public class VersionedFlowSnapshotMetadata implements Comparable<VersionedFlowSnapshotMetadata> {
 
+    @NotBlank
     private String bucketIdentifier;
+
+    @NotBlank
     private String flowIdentifier;
+
+    @NotBlank
     private String flowName;
+
+    @Min(1)
     private int version;
+
+    @Min(1)
     private long timestamp;
+
     private String comments;
 
+
     @ApiModelProperty("The identifier of the bucket this snapshot belongs to.")
     public String getBucketIdentifier() {
         return bucketIdentifier;

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/pom.xml b/nifi-registry-framework/pom.xml
index 4f36538..00bd2d6 100644
--- a/nifi-registry-framework/pom.xml
+++ b/nifi-registry-framework/pom.xml
@@ -62,6 +62,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi.registry</groupId>
+            <artifactId>nifi-registry-data-model</artifactId>
+            <version>0.0.1-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi.registry</groupId>
             <artifactId>nifi-registry-provider-api</artifactId>
             <version>0.0.1-SNAPSHOT</version>
         </dependency>
@@ -71,10 +76,24 @@
             <version>0.0.1-SNAPSHOT</version>
         </dependency>
         <dependency>
+            <groupId>org.hibernate</groupId>
+            <artifactId>hibernate-validator</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish</groupId>
+            <artifactId>javax.el</artifactId>
+        </dependency>
+        <!-- Test Dependencies -->
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.12</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
new file mode 100644
index 0000000..a83e9e2
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.exception;
+
+/**
+ * An exception that is thrown when an entity is not found.
+ */
+public class ResourceNotFoundException extends RuntimeException {
+
+    public ResourceNotFoundException(String message) {
+        super(message);
+    }
+
+    public ResourceNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java
index 3b29e97..3527355 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/flow/StandardFlowSnapshotContext.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.registry.flow;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.bucket.Bucket;
 
 /**
  * Standard implementation of FlowSnapshotContext.
@@ -96,6 +97,20 @@ public class StandardFlowSnapshotContext implements FlowSnapshotContext {
         private String comments;
         private long snapshotTimestamp;
 
+        public Builder() {
+
+        }
+
+        public Builder(final Bucket bucket, final VersionedFlowSnapshotMetadata snapshotMetadata) {
+            bucketId(bucket.getIdentifier());
+            bucketName(bucket.getName());
+            flowId(snapshotMetadata.getFlowIdentifier());
+            flowName(snapshotMetadata.getFlowName());
+            version(snapshotMetadata.getVersion());
+            comments(snapshotMetadata.getComments());
+            snapshotTimestamp(snapshotMetadata.getTimestamp());
+        }
+
         public Builder bucketId(final String bucketId) {
             this.bucketId = bucketId;
             return this;

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
index 1a83d68..03b5a86 100644
--- a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -58,7 +58,7 @@ public class StandardProviderFactory implements ProviderFactory {
         try {
             return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader());
         } catch (JAXBException e) {
-            throw new RuntimeException("Unable to create JAXBContext.");
+            throw new RuntimeException("Unable to create JAXBContext.", e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java
new file mode 100644
index 0000000..ae2482b
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/FlowSnapshotSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.serialization.jaxb.JAXBFlowSnapshotSerializer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A serializer for VersionedFlowSnapshots that maps a "version" of the data model to a serializer. The version
+ * will be written to a header at the beginning of the OutputStream, and then the object and the OutputStream will
+ * be passed on to the real serializer for the given version. Similarly, when deserializing, the header will first be
+ * read from the InputStream to determine the version, and then the InputStream will be passed to the deserializer
+ * for the given version.
+ */
+public class FlowSnapshotSerializer implements Serializer<VersionedFlowSnapshot> {
+
+    static final String MAGIC_HEADER = "Flows";
+    static final byte[] MAGIC_HEADER_BYTES = MAGIC_HEADER.getBytes(StandardCharsets.UTF_8);
+
+    static final Integer CURRENT_VERSION = 1;
+
+    private final Map<Integer, Serializer<VersionedFlowSnapshot>> serializersByVersion;
+
+    public FlowSnapshotSerializer() {
+        final Map<Integer, Serializer<VersionedFlowSnapshot>> tempSerializers = new HashMap<>();
+        tempSerializers.put(CURRENT_VERSION, new JAXBFlowSnapshotSerializer());
+        this.serializersByVersion = Collections.unmodifiableMap(tempSerializers);
+    }
+
+    @Override
+    public void serialize(final VersionedFlowSnapshot versionedFlowSnapshot, final OutputStream out) throws SerializationException {
+        final ByteBuffer byteBuffer = ByteBuffer.allocate(9);
+        byteBuffer.put(MAGIC_HEADER_BYTES);
+        byteBuffer.putInt(CURRENT_VERSION);
+
+        try {
+            out.write(byteBuffer.array());
+        } catch (final IOException e) {
+            throw new SerializationException("Unable to write header while serializing snapshot", e);
+        }
+
+        final Serializer<VersionedFlowSnapshot> serializer = serializersByVersion.get(CURRENT_VERSION);
+        if (serializer == null) {
+            throw new SerializationException("No flow snapshot serializer for version " + CURRENT_VERSION);
+        }
+
+        serializer.serialize(versionedFlowSnapshot, out);
+    }
+
+    @Override
+    public VersionedFlowSnapshot deserialize(final InputStream input) throws SerializationException {
+        final int headerLength = 9;
+        final byte[] buffer = new byte[headerLength];
+
+        int bytesRead = -1;
+        try {
+            bytesRead = input.read(buffer, 0, headerLength);
+        } catch (final IOException e) {
+            throw new SerializationException("Unable to read header while deserializing snapshot", e);
+        }
+
+        if (bytesRead < headerLength) {
+            throw new SerializationException("Unable to read header while deserializing snapshot, expected"
+                    + headerLength + " bytes, but found " + bytesRead);
+        }
+
+        final ByteBuffer bb = ByteBuffer.wrap(buffer);
+        final int version = bb.getInt(MAGIC_HEADER_BYTES.length);
+
+        final Serializer<VersionedFlowSnapshot> serializer = serializersByVersion.get(Integer.valueOf(version));
+        if (serializer == null) {
+            throw new SerializationException("No flow snapshot serializer for version " + version);
+        }
+
+        return serializer.deserialize(input);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java
new file mode 100644
index 0000000..dd05e77
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/SerializationException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+/**
+ * An error that can occur during serialization or deserialization.
+ */
+public class SerializationException extends RuntimeException {
+
+    public SerializationException(String message) {
+        super(message);
+    }
+
+    public SerializationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SerializationException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java
new file mode 100644
index 0000000..ca424ad
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/Serializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Serializes and de-serializes objects.
+ */
+public interface Serializer<T> {
+
+    /**
+     * Serializes a snapshot to the given output stream.
+     *
+     * @param t the object to serialize
+     * @param out the output stream to serialize to
+     */
+    void serialize(T t, OutputStream out) throws SerializationException;
+
+    /**
+     * Deserializes the given InputStream back to an object of the given type.
+     *
+     * @param input the InputStream to deserialize
+     * @return the deserialized object
+     */
+    T deserialize(InputStream input) throws SerializationException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java
new file mode 100644
index 0000000..83984b1
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBFlowSnapshotSerializer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization.jaxb;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+
+/**
+ * A JAXB serializer for VersionedFlowSnapshots.
+ */
+public class JAXBFlowSnapshotSerializer extends JAXBSerializer<VersionedFlowSnapshot> {
+
+    public JAXBFlowSnapshotSerializer() {
+        super(VersionedFlowSnapshot.class);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
new file mode 100644
index 0000000..eff655d
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/serialization/jaxb/JAXBSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization.jaxb;
+
+import org.apache.nifi.registry.serialization.SerializationException;
+import org.apache.nifi.registry.serialization.Serializer;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * A Serializer that uses JAXB for serializing/deserializing.
+ */
+public class JAXBSerializer<T> implements Serializer<T> {
+
+    private final JAXBContext jaxbContext;
+
+    /**
+     * Load the JAXBContext.
+     */
+    public JAXBSerializer(final Class<T> clazz) {
+        try {
+            this.jaxbContext = JAXBContext.newInstance(clazz);
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.", e);
+        }
+    }
+
+    @Override
+    public void serialize(final T t, final OutputStream out) throws SerializationException {
+        if (t == null) {
+            throw new IllegalArgumentException("The object to serialize cannot be null");
+        }
+
+        if (out == null) {
+            throw new IllegalArgumentException("OutputStream cannot be null");
+        }
+
+        try {
+            final Marshaller marshaller = jaxbContext.createMarshaller();
+            marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+            marshaller.marshal(t, out);
+        } catch (JAXBException e) {
+            throw new SerializationException("Unable to serialize object", e);
+        }
+    }
+
+    @Override
+    public T deserialize(final InputStream input) throws SerializationException {
+        if (input == null) {
+            throw new IllegalArgumentException("InputStream cannot be null");
+        }
+
+        try {
+            final Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
+            return (T) unmarshaller.unmarshal(input);
+        } catch (JAXBException e) {
+            throw new SerializationException("Unable to deserialize object", e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
new file mode 100644
index 0000000..c80dc21
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/DataModelMapper.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.StandardBucketMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Utility for mapping between Provider API and the registry data model.
+ */
+public class DataModelMapper {
+
+    public static Bucket map(final BucketMetadata bucketMetadata) {
+        final Bucket bucket = new Bucket();
+        bucket.setIdentifier(bucketMetadata.getIdentifier());
+        bucket.setName(bucketMetadata.getName());
+        bucket.setDescription(bucketMetadata.getDescription());
+        bucket.setCreatedTimestamp(bucketMetadata.getCreatedTimestamp());
+
+        if (bucketMetadata.getFlowMetadata() != null) {
+            final Set<VersionedFlow> flows = new LinkedHashSet<>();
+            bucketMetadata.getFlowMetadata().stream().forEach(f -> flows.add(map(f)));
+            bucket.setVersionedFlows(flows);
+        }
+
+        return bucket;
+    }
+
+    public static BucketMetadata map(final Bucket bucket) {
+        final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder()
+                .identifier(bucket.getIdentifier())
+                .name(bucket.getName())
+                .description(bucket.getDescription())
+                .created(bucket.getCreatedTimestamp());
+
+        if (bucket.getVersionedFlows() != null) {
+            bucket.getVersionedFlows().stream().forEach(f -> builder.addFlow(map(f)));
+        }
+
+        return builder.build();
+    }
+
+    public static VersionedFlow map(final FlowMetadata flowMetadata) {
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setIdentifier(flowMetadata.getIdentifier());
+        versionedFlow.setName(flowMetadata.getName());
+        versionedFlow.setBucketIdentifier(flowMetadata.getBucketIdentifier());
+        versionedFlow.setDescription(flowMetadata.getDescription());
+        versionedFlow.setCreatedTimestamp(flowMetadata.getCreatedTimestamp());
+        versionedFlow.setModifiedTimestamp(flowMetadata.getModifiedTimestamp());
+
+        if (flowMetadata.getSnapshotMetadata() != null) {
+            final SortedSet<VersionedFlowSnapshotMetadata> snapshots = new TreeSet<>();
+            flowMetadata.getSnapshotMetadata().stream().forEach(s -> snapshots.add(map(s)));
+            versionedFlow.setSnapshotMetadata(snapshots);
+        }
+
+        return versionedFlow;
+    }
+
+    public static FlowMetadata map(final VersionedFlow versionedFlow) {
+        final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder()
+                .identifier(versionedFlow.getIdentifier())
+                .name(versionedFlow.getName())
+                .bucketIdentifier(versionedFlow.getBucketIdentifier())
+                .description(versionedFlow.getDescription())
+                .created(versionedFlow.getCreatedTimestamp())
+                .modified(versionedFlow.getModifiedTimestamp());
+
+        if (versionedFlow.getSnapshotMetadata() != null) {
+            versionedFlow.getSnapshotMetadata().stream().forEach(s -> builder.addSnapshot(map(s)));
+        }
+
+        return builder.build();
+    }
+
+    public static VersionedFlowSnapshotMetadata map(final FlowSnapshotMetadata flowSnapshotMetadata) {
+        final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
+        metadata.setBucketIdentifier(flowSnapshotMetadata.getBucketIdentifier());
+        metadata.setFlowIdentifier(flowSnapshotMetadata.getFlowIdentifier());
+        metadata.setFlowName(flowSnapshotMetadata.getFlowName());
+        metadata.setComments(flowSnapshotMetadata.getComments());
+        metadata.setTimestamp(flowSnapshotMetadata.getCreatedTimestamp());
+        metadata.setVersion(flowSnapshotMetadata.getVersion());
+        return metadata;
+    }
+
+    public static FlowSnapshotMetadata map(final VersionedFlowSnapshotMetadata metadata) {
+        return new StandardFlowSnapshotMetadata.Builder()
+                .bucketIdentifier(metadata.getBucketIdentifier())
+                .flowIdentifier(metadata.getFlowIdentifier())
+                .flowName(metadata.getFlowName())
+                .comments(metadata.getComments())
+                .created(metadata.getTimestamp())
+                .version(metadata.getVersion())
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
new file mode 100644
index 0000000..6f53957
--- /dev/null
+++ b/nifi-registry-framework/src/main/java/org/apache/nifi/registry/service/RegistryService.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.exception.ResourceNotFoundException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.StandardFlowSnapshotContext;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.MetadataProvider;
+import org.apache.nifi.registry.metadata.StandardBucketMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.serialization.Serializer;
+
+import javax.validation.ConstraintViolation;
+import javax.validation.ConstraintViolationException;
+import javax.validation.Validator;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+public class RegistryService {
+
+    private final MetadataProvider metadataProvider;
+    private final FlowPersistenceProvider flowPersistenceProvider;
+    private final Serializer<VersionedFlowSnapshot> snapshotSerializer;
+    private final Validator validator;
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Lock readLock = lock.readLock();
+    private final Lock writeLock = lock.writeLock();
+
+    public RegistryService(final MetadataProvider metadataProvider,
+                           final FlowPersistenceProvider flowPersistenceProvider,
+                           final Serializer<VersionedFlowSnapshot> snapshotSerializer,
+                           final Validator validator) {
+        this.metadataProvider = metadataProvider;
+        this.flowPersistenceProvider = flowPersistenceProvider;
+        this.snapshotSerializer = snapshotSerializer;
+        this.validator = validator;
+    }
+
+    private <T>  void validate(T t, String invalidMessage) {
+        final Set<ConstraintViolation<T>> violations = validator.validate(t);
+        if (violations.size() > 0) {
+            throw new ConstraintViolationException(invalidMessage, violations);
+        }
+    }
+
+    // ---------------------- Bucket methods ---------------------------------------------
+
+    public Bucket createBucket(final Bucket bucket) {
+        if (bucket == null) {
+            throw new IllegalArgumentException("Bucket cannot be null");
+        }
+
+        // set an id, the created time, and clear out the flows since its read-only
+        bucket.setIdentifier(UUID.randomUUID().toString());
+        bucket.setCreatedTimestamp(System.currentTimeMillis());
+        bucket.setVersionedFlows(null);
+
+        validate(bucket, "Bucket is not valid");
+
+        writeLock.lock();
+        try {
+            final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName());
+            if (existingBucketWithSameName != null) {
+                throw new IllegalStateException("A bucket with the same name already exists: " + existingBucketWithSameName.getIdentifier());
+            }
+
+            final BucketMetadata createdBucket = metadataProvider.createBucket(DataModelMapper.map(bucket));
+            return DataModelMapper.map(createdBucket);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public Bucket getBucket(final String bucketIdentifier) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be null");
+        }
+
+        readLock.lock();
+        try {
+            final BucketMetadata bucket = metadataProvider.getBucketById(bucketIdentifier);
+            if (bucket == null) {
+                throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
+            }
+
+            return DataModelMapper.map(bucket);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public Set<Bucket> getBuckets() {
+        readLock.lock();
+        try {
+            final Set<BucketMetadata> buckets = metadataProvider.getBuckets();
+            return buckets.stream().map(b -> DataModelMapper.map(b)).collect(Collectors.toSet());
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public Bucket updateBucket(final Bucket bucket) {
+        if (bucket == null) {
+            throw new IllegalArgumentException("Bucket cannot be null");
+        }
+
+        if (bucket.getIdentifier() == null) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be null");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure a bucket with the given id exists
+            final BucketMetadata existingBucketById = metadataProvider.getBucketById(bucket.getIdentifier());
+            if (existingBucketById == null) {
+                throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucket.getIdentifier());
+            }
+
+            // ensure a different bucket with the same name does not exist
+            // since we're allowing partial updates here, only check this if a non-null name is provided
+            if (StringUtils.isNotBlank(bucket.getName())) {
+                final BucketMetadata existingBucketWithSameName = metadataProvider.getBucketByName(bucket.getName());
+                if (existingBucketWithSameName != null && !existingBucketWithSameName.getIdentifier().equals(existingBucketById.getIdentifier())) {
+                    throw new IllegalStateException("A bucket with the same name already exists: " + bucket.getName());
+                }
+            }
+
+            final StandardBucketMetadata.Builder builder = new StandardBucketMetadata.Builder(existingBucketById);
+
+            // transfer over the new values to the existing bucket
+            if (StringUtils.isNotBlank(bucket.getName())) {
+                builder.name(bucket.getName());
+            }
+
+            if (bucket.getDescription() != null) {
+                builder.description(bucket.getDescription());
+            }
+
+            // perform the actual update
+            final BucketMetadata updatedBucket = metadataProvider.updateBucket(builder.build());
+            return DataModelMapper.map(updatedBucket);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public Bucket deleteBucket(final String bucketIdentifier) {
+        if (bucketIdentifier == null) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be null");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
+            }
+
+            // retrieve the versioned flows that are in this bucket
+            final Set<FlowMetadata> bucketFlows = metadataProvider.getFlows(bucketIdentifier);
+
+            // for each flow in the bucket, delete all snapshots from the flow persistence provider
+            for (final FlowMetadata bucketFlow : bucketFlows) {
+                flowPersistenceProvider.deleteSnapshots(bucketIdentifier, bucketFlow.getIdentifier());
+            }
+
+            // now delete the bucket from the metadata provider, which deletes all flows referencing it
+            metadataProvider.deleteBucket(bucketIdentifier);
+
+            return DataModelMapper.map(existingBucket);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    // ---------------------- VersionedFlow methods ---------------------------------------------
+
+    public VersionedFlow createFlow(final String bucketIdentifier, final VersionedFlow versionedFlow) {
+        if (StringUtils.isBlank(bucketIdentifier)) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be null or blank");
+        }
+
+        if (versionedFlow == null) {
+            throw new IllegalArgumentException("VersionedFlow cannot be null");
+        }
+
+        if (versionedFlow.getBucketIdentifier() != null && !bucketIdentifier.equals(versionedFlow.getBucketIdentifier())) {
+            throw new IllegalArgumentException("Bucket identifiers must match");
+        }
+
+        if (versionedFlow.getBucketIdentifier() == null) {
+            versionedFlow.setBucketIdentifier(bucketIdentifier);
+        }
+
+        versionedFlow.setIdentifier(UUID.randomUUID().toString());
+
+        final long timestamp = System.currentTimeMillis();
+        versionedFlow.setCreatedTimestamp(timestamp);
+        versionedFlow.setModifiedTimestamp(timestamp);
+
+        // clear out the snapshots since they are read-only
+        versionedFlow.setSnapshotMetadata(null);
+
+        validate(versionedFlow, "VersionedFlow is not valid");
+
+        writeLock.lock();
+        try {
+            // ensure the bucket exists
+            final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketIdentifier);
+            if (existingBucket == null) {
+                throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketIdentifier);
+            }
+
+            final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName());
+            if (existingFlowWithSameName != null) {
+                throw new IllegalStateException("A VersionedFlow with the same name already exists: " + existingFlowWithSameName.getIdentifier());
+            }
+
+            // create the flow
+            final FlowMetadata createdFlow = metadataProvider.createFlow(bucketIdentifier, DataModelMapper.map(versionedFlow));
+            return DataModelMapper.map(createdFlow);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public VersionedFlow getFlow(final String flowIdentifier) {
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
+        }
+
+        readLock.lock();
+        try {
+            final FlowMetadata flowMetadata = metadataProvider.getFlowById(flowIdentifier);
+            if (flowMetadata == null) {
+                throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier);
+            }
+
+            return DataModelMapper.map(flowMetadata);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public Set<VersionedFlow> getFlows() {
+        readLock.lock();
+        try {
+            final Set<FlowMetadata> flows = metadataProvider.getFlows();
+            return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet());
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public Set<VersionedFlow> getFlows(String bucketId) {
+        if (StringUtils.isBlank(bucketId)) {
+            throw new IllegalArgumentException("Bucket Identifier cannot be null");
+        }
+
+        readLock.lock();
+        try {
+            final BucketMetadata existingBucket = metadataProvider.getBucketById(bucketId);
+            if (existingBucket == null) {
+                throw new ResourceNotFoundException("Bucket does not exist for identifier: " + bucketId);
+            }
+
+            final Set<FlowMetadata> flows = metadataProvider.getFlows(bucketId);
+            return flows.stream().map(f -> DataModelMapper.map(f)).collect(Collectors.toSet());
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public VersionedFlow updateFlow(final VersionedFlow versionedFlow) {
+        if (versionedFlow == null) {
+            throw new IllegalArgumentException("VersionedFlow cannot be null");
+        }
+
+        if (StringUtils.isBlank(versionedFlow.getIdentifier())) {
+            throw new IllegalArgumentException("VersionedFlow identifier cannot be null or blank");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure a flow with the given id exists
+            final FlowMetadata existingFlow = metadataProvider.getFlowById(versionedFlow.getIdentifier());
+            if (existingFlow == null) {
+                throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + versionedFlow.getIdentifier());
+            }
+
+            // ensure a different flow with the same name does not exist
+            // since we're allowing partial updates here, only check this if a non-null name is provided
+            if (StringUtils.isNotBlank(versionedFlow.getName())) {
+                final FlowMetadata existingFlowWithSameName = metadataProvider.getFlowByName(versionedFlow.getName());
+                if (existingFlowWithSameName != null && !existingFlowWithSameName.getIdentifier().equals(existingFlow.getIdentifier())) {
+                    throw new IllegalStateException("A VersionedFlow with the same name already exists: " + versionedFlow.getName());
+                }
+            }
+
+            final StandardFlowMetadata.Builder builder = new StandardFlowMetadata.Builder(existingFlow);
+
+            // transfer over the new values to the existing flow
+            if (StringUtils.isNotBlank(versionedFlow.getName())) {
+                builder.name(versionedFlow.getName());
+            }
+
+            if (versionedFlow.getDescription() != null) {
+                builder.description(versionedFlow.getDescription());
+            }
+
+            builder.modified(System.currentTimeMillis());
+
+            // perform the actual update
+            final FlowMetadata updatedFlow = metadataProvider.updateFlow(builder.build());
+            return DataModelMapper.map(updatedFlow);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public VersionedFlow deleteFlow(final String flowIdentifier) {
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure the flow exists
+            final FlowMetadata existingFlow = metadataProvider.getFlowById(flowIdentifier);
+            if (existingFlow == null) {
+                throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + flowIdentifier);
+            }
+
+            // delete all snapshots from the flow persistence provider
+            flowPersistenceProvider.deleteSnapshots(existingFlow.getBucketIdentifier(), existingFlow.getIdentifier());
+
+            // now delete the flow from the metadata provider
+            metadataProvider.deleteFlow(flowIdentifier);
+
+            return DataModelMapper.map(existingFlow);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    // ---------------------- VersionedFlowSnapshot methods ---------------------------------------------
+
+    public VersionedFlowSnapshot createFlowSnapshot(final VersionedFlowSnapshot flowSnapshot) {
+        if (flowSnapshot == null) {
+            throw new IllegalArgumentException("VersionedFlowSnapshot cannot be null");
+        }
+
+        // validation will ensure that the metadata and contents are not null
+        if (flowSnapshot.getSnapshotMetadata() != null) {
+            flowSnapshot.getSnapshotMetadata().setTimestamp(System.currentTimeMillis());
+        }
+
+        validate(flowSnapshot, "VersionedFlowSnapshot is not valid");
+
+        writeLock.lock();
+        try {
+            final VersionedFlowSnapshotMetadata snapshotMetadata = flowSnapshot.getSnapshotMetadata();
+
+            // ensure the bucket exists
+            final BucketMetadata existingBucket = metadataProvider.getBucketById(snapshotMetadata.getBucketIdentifier());
+            if (existingBucket == null) {
+                throw new ResourceNotFoundException("Bucket does not exist for identifier: " + snapshotMetadata.getBucketIdentifier());
+            }
+
+            // ensure the flow exists
+            final FlowMetadata existingFlowMetadata = metadataProvider.getFlowById(snapshotMetadata.getFlowIdentifier());
+            if (existingFlowMetadata == null) {
+                throw new ResourceNotFoundException("VersionedFlow does not exist for identifier: " + snapshotMetadata.getFlowIdentifier());
+            }
+
+            final VersionedFlow existingFlow = DataModelMapper.map(existingFlowMetadata);
+
+            // if we already have snapshots we need to verify the new one has the correct version
+            if (existingFlow.getSnapshotMetadata() != null && existingFlow.getSnapshotMetadata().size() > 0) {
+                final VersionedFlowSnapshotMetadata lastSnapshot = existingFlow.getSnapshotMetadata().last();
+
+                if (snapshotMetadata.getVersion() <= lastSnapshot.getVersion()) {
+                    throw new IllegalStateException("A VersionedFlowSnapshot with the same version already exists: " + snapshotMetadata.getVersion());
+                }
+
+                if (snapshotMetadata.getVersion() > (lastSnapshot.getVersion() + 1)) {
+                    throw new IllegalStateException("Version must be a one-up number, last version was "
+                            + lastSnapshot.getVersion() + " and version for this snapshot was "
+                            + snapshotMetadata.getVersion());
+                }
+            } else if (snapshotMetadata.getVersion() != 1) {
+                throw new IllegalStateException("Version of first snapshot must be 1");
+            }
+
+            // serialize the snapshot
+            final ByteArrayOutputStream out = new ByteArrayOutputStream();
+            snapshotSerializer.serialize(flowSnapshot, out);
+
+            // save the serialized snapshot to the persistence provider
+            final Bucket bucket = DataModelMapper.map(existingBucket);
+            final FlowSnapshotContext context = new StandardFlowSnapshotContext.Builder(bucket, snapshotMetadata).build();
+            flowPersistenceProvider.saveSnapshot(context, out.toByteArray());
+
+            // create snapshot in the metadata provider
+            metadataProvider.createFlowSnapshot(DataModelMapper.map(snapshotMetadata));
+            return flowSnapshot;
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public VersionedFlowSnapshot getFlowSnapshot(final String flowIdentifier, final Integer version) {
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
+        }
+
+        if (version == null) {
+            throw new IllegalArgumentException("Version cannot be null or blank");
+        }
+
+        readLock.lock();
+        try {
+            // ensure the snapshot exists
+            final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version);
+            if (snapshotMetadata == null) {
+                throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow " + flowIdentifier + " and version " + version);
+            }
+
+            // get the serialized bytes of the snapshot
+            final byte[] serializedSnapshot = flowPersistenceProvider.getSnapshot(
+                    snapshotMetadata.getBucketIdentifier(),
+                    snapshotMetadata.getFlowIdentifier(),
+                    snapshotMetadata.getVersion()
+            );
+
+            if (serializedSnapshot == null || serializedSnapshot.length == 0) {
+                throw new IllegalStateException("No serialized content found for snapshot with flow identifier "
+                        + flowIdentifier + " and version " + version);
+            }
+
+            final InputStream input = new ByteArrayInputStream(serializedSnapshot);
+            return snapshotSerializer.deserialize(input);
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public VersionedFlowSnapshotMetadata deleteFlowSnapshot(final String flowIdentifier, final Integer version) {
+        if (StringUtils.isBlank(flowIdentifier)) {
+            throw new IllegalArgumentException("Flow Identifier cannot be null or blank");
+        }
+
+        if (version == null) {
+            throw new IllegalArgumentException("Version cannot be null or blank");
+        }
+
+        writeLock.lock();
+        try {
+            // ensure the snapshot exists
+            final FlowSnapshotMetadata snapshotMetadata = metadataProvider.getFlowSnapshot(flowIdentifier, version);
+            if (snapshotMetadata == null) {
+                throw new ResourceNotFoundException("VersionedFlowSnapshot does not exist for flow "
+                        + flowIdentifier + " and version " + version);
+            }
+
+            // delete the content of the snapshot
+            flowPersistenceProvider.deleteSnapshot(
+                    snapshotMetadata.getBucketIdentifier(),
+                    snapshotMetadata.getFlowIdentifier(),
+                    snapshotMetadata.getVersion());
+
+            // delete the snapshot itself
+            metadataProvider.deleteFlowSnapshot(flowIdentifier, version);
+            return DataModelMapper.map(snapshotMetadata);
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
index 781f348..176a9a5 100644
--- a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/provider/MockMetadataProvider.java
@@ -43,7 +43,12 @@ public class MockMetadataProvider implements MetadataProvider {
     }
 
     @Override
-    public BucketMetadata getBucket(String bucketIdentifier) {
+    public BucketMetadata getBucketById(String bucketIdentifier) {
+        return null;
+    }
+
+    @Override
+    public BucketMetadata getBucketByName(String name) {
         return null;
     }
 
@@ -68,7 +73,12 @@ public class MockMetadataProvider implements MetadataProvider {
     }
 
     @Override
-    public FlowMetadata getFlow(String flowIdentifier) {
+    public FlowMetadata getFlowById(String flowIdentifier) {
+        return null;
+    }
+
+    @Override
+    public FlowMetadata getFlowByName(String name) {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java
new file mode 100644
index 0000000..14c78b2
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/TestFlowSnapshotSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.flow.VersionedProcessGroup;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+
+public class TestFlowSnapshotSerializer {
+
+    @Test
+    public void testSerializeDeserializeFlowSnapshot() throws SerializationException {
+        final Serializer<VersionedFlowSnapshot> serializer = new FlowSnapshotSerializer();
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setFlowIdentifier("flow1");
+        snapshotMetadata.setFlowName("First Flow");
+        snapshotMetadata.setVersion(1);
+        snapshotMetadata.setComments("This is the first flow");
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+
+        final VersionedProcessGroup processGroup = new VersionedProcessGroup();
+        processGroup.setIdentifier("pg1");
+        processGroup.setName("My Process Group");
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+        snapshot.setFlowContents(processGroup);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        serializer.serialize(snapshot, out);
+
+        final ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+        final VersionedFlowSnapshot deserializedSnapshot = serializer.deserialize(in);
+        final VersionedFlowSnapshotMetadata deserializedMetadata = deserializedSnapshot.getSnapshotMetadata();
+        final VersionedProcessGroup deserializedProcessGroup = deserializedSnapshot.getFlowContents();
+
+        Assert.assertEquals(snapshotMetadata.getFlowIdentifier(), deserializedMetadata.getFlowIdentifier());
+        Assert.assertEquals(snapshotMetadata.getFlowName(), deserializedMetadata.getFlowName());
+        Assert.assertEquals(snapshotMetadata.getVersion(), deserializedMetadata.getVersion());
+        Assert.assertEquals(snapshotMetadata.getComments(), deserializedMetadata.getComments());
+        Assert.assertEquals(snapshotMetadata.getTimestamp(), deserializedMetadata.getTimestamp());
+
+        Assert.assertEquals(processGroup.getIdentifier(), deserializedProcessGroup.getIdentifier());
+        Assert.assertEquals(processGroup.getName(), deserializedProcessGroup.getName());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java
new file mode 100644
index 0000000..90accf6
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/serialization/jaxb/TestJAXBFlowSnapshotSerializer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.serialization.jaxb;
+
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.serialization.SerializationException;
+import org.apache.nifi.registry.serialization.Serializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+
+public class TestJAXBFlowSnapshotSerializer {
+
+    @Test
+    public void testSerializeDeserializeFlowSnapshot() throws SerializationException {
+        final Serializer<VersionedFlowSnapshot> serializer = new JAXBFlowSnapshotSerializer();
+
+        final VersionedFlowSnapshotMetadata snapshotMetadata = new VersionedFlowSnapshotMetadata();
+        snapshotMetadata.setFlowIdentifier("flow1");
+        snapshotMetadata.setFlowName("First Flow");
+        snapshotMetadata.setVersion(1);
+        snapshotMetadata.setComments("This is the first flow");
+        snapshotMetadata.setTimestamp(System.currentTimeMillis());
+
+        final VersionedFlowSnapshot snapshot = new VersionedFlowSnapshot();
+        snapshot.setSnapshotMetadata(snapshotMetadata);
+
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+        serializer.serialize(snapshot, out);
+
+        final String snapshotStr = new String(out.toByteArray(), StandardCharsets.UTF_8);
+        //System.out.println(snapshotStr);
+
+        final ByteArrayInputStream in = new ByteArrayInputStream(snapshotStr.getBytes(StandardCharsets.UTF_8));
+        final VersionedFlowSnapshot deserializedSnapshot = serializer.deserialize(in);
+        final VersionedFlowSnapshotMetadata deserializedMetadata = deserializedSnapshot.getSnapshotMetadata();
+
+        Assert.assertEquals(snapshotMetadata.getFlowIdentifier(), deserializedMetadata.getFlowIdentifier());
+        Assert.assertEquals(snapshotMetadata.getFlowName(), deserializedMetadata.getFlowName());
+        Assert.assertEquals(snapshotMetadata.getVersion(), deserializedMetadata.getVersion());
+        Assert.assertEquals(snapshotMetadata.getComments(), deserializedMetadata.getComments());
+        Assert.assertEquals(snapshotMetadata.getTimestamp(), deserializedMetadata.getTimestamp());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/a1629c86/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java
new file mode 100644
index 0000000..1a6f3c9
--- /dev/null
+++ b/nifi-registry-framework/src/test/java/org/apache/nifi/registry/service/TestDataModelMapper.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.service;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.BucketMetadata;
+import org.apache.nifi.registry.metadata.FlowMetadata;
+import org.apache.nifi.registry.metadata.FlowSnapshotMetadata;
+import org.apache.nifi.registry.metadata.StandardBucketMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowMetadata;
+import org.apache.nifi.registry.metadata.StandardFlowSnapshotMetadata;
+import org.junit.Test;
+
+import java.util.LinkedHashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class TestDataModelMapper {
+
+    @Test
+    public void testMapBucketToBucketMetadata() {
+        // create a bucket
+        final Bucket bucket = new Bucket();
+        bucket.setIdentifier("bucket1");
+        bucket.setName("Bucket 1");
+        bucket.setDescription("This is bucket 1.");
+        bucket.setCreatedTimestamp(System.currentTimeMillis());
+
+        // create a flow
+        final VersionedFlow versionedFlow = new VersionedFlow();
+        versionedFlow.setIdentifier("flow1");
+        versionedFlow.setName("Flow 1");
+        versionedFlow.setDescription("This is flow 1");
+        versionedFlow.setBucketIdentifier(bucket.getIdentifier());
+        versionedFlow.setCreatedTimestamp(System.currentTimeMillis());
+        versionedFlow.setModifiedTimestamp(System.currentTimeMillis());
+
+        // create a snapshot for the flow
+        final VersionedFlowSnapshotMetadata versionedFlowSnapshotMetadata = new VersionedFlowSnapshotMetadata();
+        versionedFlowSnapshotMetadata.setBucketIdentifier(bucket.getIdentifier());
+        versionedFlowSnapshotMetadata.setFlowIdentifier(versionedFlow.getIdentifier());
+        versionedFlowSnapshotMetadata.setFlowName(versionedFlow.getName());
+        versionedFlowSnapshotMetadata.setVersion(1);
+        versionedFlowSnapshotMetadata.setTimestamp(System.currentTimeMillis());
+        versionedFlowSnapshotMetadata.setComments("This is snapshot 1 of flow 1");
+
+        // add the snapshot to the flow
+        final SortedSet<VersionedFlowSnapshotMetadata> versionedFlowSnapshotMetadataSet = new TreeSet<>();
+        versionedFlowSnapshotMetadataSet.add(versionedFlowSnapshotMetadata);
+        versionedFlow.setSnapshotMetadata(versionedFlowSnapshotMetadataSet);
+
+        // add the flow to the bucket
+        final Set<VersionedFlow> versionedFlows = new LinkedHashSet<>();
+        versionedFlows.add(versionedFlow);
+        bucket.setVersionedFlows(versionedFlows);
+
+        // test the mapping from bucket to bucket metadata
+
+        final BucketMetadata bucketMetadata = DataModelMapper.map(bucket);
+        assertEquals(bucket.getIdentifier(), bucketMetadata.getIdentifier());
+        assertEquals(bucket.getName(), bucketMetadata.getName());
+        assertEquals(bucket.getDescription(), bucketMetadata.getDescription());
+        assertEquals(bucket.getCreatedTimestamp(), bucketMetadata.getCreatedTimestamp());
+
+        assertNotNull(bucketMetadata.getFlowMetadata());
+        assertEquals(1, bucketMetadata.getFlowMetadata().size());
+
+        final FlowMetadata flowMetadata = bucketMetadata.getFlowMetadata().iterator().next();
+        assertNotNull(flowMetadata);
+        assertEquals(versionedFlow.getIdentifier(), flowMetadata.getIdentifier());
+        assertEquals(versionedFlow.getName(), flowMetadata.getName());
+        assertEquals(versionedFlow.getDescription(), flowMetadata.getDescription());
+        assertEquals(versionedFlow.getBucketIdentifier(), flowMetadata.getBucketIdentifier());
+        assertEquals(versionedFlow.getCreatedTimestamp(), flowMetadata.getCreatedTimestamp());
+        assertEquals(versionedFlow.getModifiedTimestamp(), flowMetadata.getModifiedTimestamp());
+
+        assertNotNull(flowMetadata.getSnapshotMetadata());
+        assertEquals(1, flowMetadata.getSnapshotMetadata().size());
+
+        final FlowSnapshotMetadata flowSnapshotMetadata = flowMetadata.getSnapshotMetadata().iterator().next();
+        assertNotNull(flowSnapshotMetadata);
+        assertEquals(versionedFlowSnapshotMetadata.getFlowIdentifier(), flowSnapshotMetadata.getFlowIdentifier());
+        assertEquals(versionedFlowSnapshotMetadata.getFlowName(), flowSnapshotMetadata.getFlowName());
+        assertEquals(versionedFlowSnapshotMetadata.getBucketIdentifier(), flowSnapshotMetadata.getBucketIdentifier());
+        assertEquals(versionedFlowSnapshotMetadata.getVersion(), flowSnapshotMetadata.getVersion());
+        assertEquals(versionedFlowSnapshotMetadata.getComments(), flowSnapshotMetadata.getComments());
+        assertEquals(versionedFlowSnapshotMetadata.getTimestamp(), flowSnapshotMetadata.getCreatedTimestamp());
+    }
+
+    @Test
+    public void testMapBucketMetadataToBucket() {
+        // create snapshot metadata
+        final FlowSnapshotMetadata snapshotMetadata = new StandardFlowSnapshotMetadata.Builder()
+                .flowIdentifier("flow1")
+                .flowName("Flow 1")
+                .bucketIdentifier("bucket1")
+                .version(1)
+                .comments("This is snapshot 1 of flow 1.")
+                .created(System.currentTimeMillis())
+                .build();
+
+        // create flow metadata
+        final FlowMetadata flowMetadata = new StandardFlowMetadata.Builder()
+                .identifier("flow1")
+                .name("Flow 1")
+                .bucketIdentifier("bucket1")
+                .description("This flow 1.")
+                .created(System.currentTimeMillis())
+                .modified(System.currentTimeMillis())
+                .addSnapshot(snapshotMetadata)
+                .build();
+
+        // create bucket metadata
+        final BucketMetadata bucketMetadata = new StandardBucketMetadata.Builder()
+                .identifier("bucket1")
+                .name("Bucket 1")
+                .description("This is bucket 1.")
+                .created(System.currentTimeMillis())
+                .addFlow(flowMetadata)
+                .build();
+
+        // test the mapping from bucket metadata to bucket
+
+        final Bucket bucket = DataModelMapper.map(bucketMetadata);
+        assertEquals(bucketMetadata.getIdentifier(), bucket.getIdentifier());
+        assertEquals(bucketMetadata.getName(), bucket.getName());
+        assertEquals(bucketMetadata.getDescription(), bucket.getDescription());
+        assertEquals(bucketMetadata.getCreatedTimestamp(), bucket.getCreatedTimestamp());
+
+        assertNotNull(bucket.getVersionedFlows());
+        assertEquals(1, bucket.getVersionedFlows().size());
+
+        final VersionedFlow versionedFlow = bucket.getVersionedFlows().iterator().next();
+        assertNotNull(versionedFlow);
+        assertEquals(flowMetadata.getIdentifier(), versionedFlow.getIdentifier());
+        assertEquals(flowMetadata.getName(), versionedFlow.getName());
+        assertEquals(flowMetadata.getBucketIdentifier(), versionedFlow.getBucketIdentifier());
+        assertEquals(flowMetadata.getDescription(), versionedFlow.getDescription());
+        assertEquals(flowMetadata.getCreatedTimestamp(), versionedFlow.getCreatedTimestamp());
+        assertEquals(flowMetadata.getModifiedTimestamp(), versionedFlow.getModifiedTimestamp());
+
+        assertNotNull(versionedFlow.getSnapshotMetadata());
+        assertEquals(1, versionedFlow.getSnapshotMetadata().size());
+
+        final VersionedFlowSnapshotMetadata versionedFlowSnapshotMetadata = versionedFlow.getSnapshotMetadata().first();
+        assertEquals(snapshotMetadata.getFlowIdentifier(), versionedFlowSnapshotMetadata.getFlowIdentifier());
+        assertEquals(snapshotMetadata.getFlowName(), versionedFlowSnapshotMetadata.getFlowName());
+        assertEquals(snapshotMetadata.getBucketIdentifier(), versionedFlowSnapshotMetadata.getBucketIdentifier());
+        assertEquals(snapshotMetadata.getVersion(), versionedFlowSnapshotMetadata.getVersion());
+        assertEquals(snapshotMetadata.getComments(), versionedFlowSnapshotMetadata.getComments());
+        assertEquals(snapshotMetadata.getCreatedTimestamp(), versionedFlowSnapshotMetadata.getTimestamp());
+    }
+
+}