You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2018/09/22 02:11:38 UTC

[39/51] [partial] nifi-registry git commit: NIFIREG-201 Refactoring project structure to better isolate extensions

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java
new file mode 100644
index 0000000..ec6b9a5
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.migration;
+
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * FlowSnapshot DB entity from the original database schema in 0.1.0, used for migration purposes.
+ */
+public class FlowSnapshotEntityV1 {
+
+    private String flowId;
+
+    private Integer version;
+
+    private Date created;
+
+    private String createdBy;
+
+    private String comments;
+
+    public String getFlowId() {
+        return flowId;
+    }
+
+    public void setFlowId(String flowId) {
+        this.flowId = flowId;
+    }
+
+    public Integer getVersion() {
+        return version;
+    }
+
+    public void setVersion(Integer version) {
+        this.version = version;
+    }
+
+    public Date getCreated() {
+        return created;
+    }
+
+    public void setCreated(Date created) {
+        this.created = created;
+    }
+
+    public String getCreatedBy() {
+        return createdBy;
+    }
+
+    public void setCreatedBy(String createdBy) {
+        this.createdBy = createdBy;
+    }
+
+    public String getComments() {
+        return comments;
+    }
+
+    public void setComments(String comments) {
+        this.comments = comments;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(this.flowId, this.version);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null) {
+            return false;
+        }
+
+        if (!(obj instanceof FlowSnapshotEntityV1)) {
+            return false;
+        }
+
+        final FlowSnapshotEntityV1 other = (FlowSnapshotEntityV1) obj;
+        return Objects.equals(this.flowId, other.flowId) && Objects.equals(this.version, other.version);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java
new file mode 100644
index 0000000..72d3acf
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.migration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.h2.jdbcx.JdbcConnectionPool;
+
+import javax.sql.DataSource;
+import java.io.File;
+
+/**
+ * NOTE: This DataSource factory was used in the original 0.1.0 release and remains to migrate data from the old database.
+ * This class is intentionally not a Spring bean, and will be used manually in the custom Flyway migration.
+ */
+public class LegacyDataSourceFactory {
+
+    private static final String DB_USERNAME_PASSWORD = "nifireg";
+    private static final int MAX_CONNECTIONS = 5;
+
+    // database file name
+    private static final String DATABASE_FILE_NAME = "nifi-registry";
+
+    private final NiFiRegistryProperties properties;
+
+    private JdbcConnectionPool connectionPool;
+
+    public LegacyDataSourceFactory(final NiFiRegistryProperties properties) {
+        this.properties = properties;
+    }
+
+    public DataSource getDataSource() {
+        if (connectionPool == null) {
+            final String databaseUrl = getDatabaseUrl(properties);
+            connectionPool = JdbcConnectionPool.create(databaseUrl, DB_USERNAME_PASSWORD, DB_USERNAME_PASSWORD);
+            connectionPool.setMaxConnections(MAX_CONNECTIONS);
+        }
+
+        return connectionPool;
+    }
+
+    public static String getDatabaseUrl(final NiFiRegistryProperties properties) {
+        // locate the repository directory
+        final String repositoryDirectoryPath = properties.getLegacyDatabaseDirectory();
+
+        // ensure the repository directory is specified
+        if (repositoryDirectoryPath == null) {
+            throw new NullPointerException("Database directory must be specified.");
+        }
+
+        // create a handle to the repository directory
+        final File repositoryDirectory = new File(repositoryDirectoryPath);
+
+        // get a handle to the database file
+        final File databaseFile = new File(repositoryDirectory, DATABASE_FILE_NAME);
+
+        // format the database url
+        String databaseUrl = "jdbc:h2:" + databaseFile + ";AUTOCOMMIT=OFF;DB_CLOSE_ON_EXIT=FALSE;LOCK_MODE=3";
+        String databaseUrlAppend = properties.getLegacyDatabaseUrlAppend();
+        if (StringUtils.isNotBlank(databaseUrlAppend)) {
+            databaseUrl += databaseUrlAppend;
+        }
+
+        return databaseUrl;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java
new file mode 100644
index 0000000..533fadd
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.migration;
+
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import javax.sql.DataSource;
+import java.util.List;
+
+/**
+ * Service used to load data from original database used in the 0.1.0 release.
+ */
+public class LegacyDatabaseService {
+
+    private final JdbcTemplate jdbcTemplate;
+
+    public LegacyDatabaseService(final DataSource dataSource) {
+        this.jdbcTemplate = new JdbcTemplate(dataSource);
+    }
+
+    public List<BucketEntityV1> getAllBuckets() {
+        final String sql = "SELECT * FROM bucket ORDER BY name ASC";
+
+        return jdbcTemplate.query(sql, (rs, i) -> {
+            final BucketEntityV1 b = new BucketEntityV1();
+            b.setId(rs.getString("ID"));
+            b.setName(rs.getString("NAME"));
+            b.setDescription(rs.getString("DESCRIPTION"));
+            b.setCreated(rs.getTimestamp("CREATED"));
+            return b;
+        });
+    }
+
+    public List<FlowEntityV1> getAllFlows() {
+        final String sql = "SELECT * FROM flow f, bucket_item item WHERE item.id = f.id";
+
+        return jdbcTemplate.query(sql, (rs, i) -> {
+            final FlowEntityV1 flowEntity = new FlowEntityV1();
+            flowEntity.setId(rs.getString("ID"));
+            flowEntity.setName(rs.getString("NAME"));
+            flowEntity.setDescription(rs.getString("DESCRIPTION"));
+            flowEntity.setCreated(rs.getTimestamp("CREATED"));
+            flowEntity.setModified(rs.getTimestamp("MODIFIED"));
+            flowEntity.setBucketId(rs.getString("BUCKET_ID"));
+            return flowEntity;
+        });
+    }
+
+    public List<FlowSnapshotEntityV1> getAllFlowSnapshots() {
+        final String sql = "SELECT * FROM flow_snapshot fs";
+
+        return jdbcTemplate.query(sql, (rs, i) -> {
+            final FlowSnapshotEntityV1 fs = new FlowSnapshotEntityV1();
+            fs.setFlowId(rs.getString("FLOW_ID"));
+            fs.setVersion(rs.getInt("VERSION"));
+            fs.setCreated(rs.getTimestamp("CREATED"));
+            fs.setCreatedBy(rs.getString("CREATED_BY"));
+            fs.setComments(rs.getString("COMMENTS"));
+            return fs;
+        });
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java
new file mode 100644
index 0000000..bf82aae
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.migration;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntityType;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+
+/**
+ * Utility methods to map legacy DB entities to current DB entities.
+ *
+ * The initial implementations of these mappings will be almost a direct translation, but if future changes are made
+ * to the original tables these methods will handle the translation from old entity to new entity.
+ */
+public class LegacyEntityMapper {
+
+    public static BucketEntity createBucketEntity(final BucketEntityV1 bucketEntityV1) {
+        final BucketEntity bucketEntity = new BucketEntity();
+        bucketEntity.setId(bucketEntityV1.getId());
+        bucketEntity.setName(bucketEntityV1.getName());
+        bucketEntity.setDescription(bucketEntityV1.getDescription());
+        bucketEntity.setCreated(bucketEntityV1.getCreated());
+        return bucketEntity;
+    }
+
+    public static FlowEntity createFlowEntity(final FlowEntityV1 flowEntityV1) {
+        final FlowEntity flowEntity = new FlowEntity();
+        flowEntity.setId(flowEntityV1.getId());
+        flowEntity.setName(flowEntityV1.getName());
+        flowEntity.setDescription(flowEntityV1.getDescription());
+        flowEntity.setCreated(flowEntityV1.getCreated());
+        flowEntity.setModified(flowEntityV1.getModified());
+        flowEntity.setBucketId(flowEntityV1.getBucketId());
+        flowEntity.setType(BucketItemEntityType.FLOW);
+        return flowEntity;
+    }
+
+    public static FlowSnapshotEntity createFlowSnapshotEntity(final FlowSnapshotEntityV1 flowSnapshotEntityV1) {
+        final FlowSnapshotEntity flowSnapshotEntity = new FlowSnapshotEntity();
+        flowSnapshotEntity.setFlowId(flowSnapshotEntityV1.getFlowId());
+        flowSnapshotEntity.setVersion(flowSnapshotEntityV1.getVersion());
+        flowSnapshotEntity.setComments(flowSnapshotEntityV1.getComments());
+        flowSnapshotEntity.setCreated(flowSnapshotEntityV1.getCreated());
+        flowSnapshotEntity.setCreatedBy(flowSnapshotEntityV1.getCreatedBy());
+        return flowSnapshotEntity;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java
new file mode 100644
index 0000000..b837d6d
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.event;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventFieldName;
+import org.apache.nifi.registry.hook.EventType;
+import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils;
+
+/**
+ * Factory to create Events from domain objects.
+ */
+public class EventFactory {
+
+    public static Event bucketCreated(final Bucket bucket) {
+        return new StandardEvent.Builder()
+                .eventType(EventType.CREATE_BUCKET)
+                .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
+                .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+                .build();
+    }
+
+    public static Event bucketUpdated(final Bucket bucket) {
+        return new StandardEvent.Builder()
+                .eventType(EventType.UPDATE_BUCKET)
+                .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
+                .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+                .build();
+    }
+
+    public static Event bucketDeleted(final Bucket bucket) {
+        return new StandardEvent.Builder()
+                .eventType(EventType.DELETE_BUCKET)
+                .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
+                .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+                .build();
+    }
+
+    public static Event flowCreated(final VersionedFlow versionedFlow) {
+        return new StandardEvent.Builder()
+                .eventType(EventType.CREATE_FLOW)
+                .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
+                .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
+                .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+                .build();
+    }
+
+    public static Event flowUpdated(final VersionedFlow versionedFlow) {
+        return new StandardEvent.Builder()
+                .eventType(EventType.UPDATE_FLOW)
+                .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
+                .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
+                .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+                .build();
+    }
+
+    public static Event flowDeleted(final VersionedFlow versionedFlow) {
+        return new StandardEvent.Builder()
+                .eventType(EventType.DELETE_FLOW)
+                .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
+                .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
+                .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+                .build();
+    }
+
+    public static Event flowVersionCreated(final VersionedFlowSnapshot versionedFlowSnapshot) {
+        final String versionComments = versionedFlowSnapshot.getSnapshotMetadata().getComments() == null
+                ? "" : versionedFlowSnapshot.getSnapshotMetadata().getComments();
+
+        return new StandardEvent.Builder()
+                .eventType(EventType.CREATE_FLOW_VERSION)
+                .addField(EventFieldName.BUCKET_ID, versionedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier())
+                .addField(EventFieldName.FLOW_ID, versionedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier())
+                .addField(EventFieldName.VERSION, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion()))
+                .addField(EventFieldName.USER, versionedFlowSnapshot.getSnapshotMetadata().getAuthor())
+                .addField(EventFieldName.COMMENT, versionComments)
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java
new file mode 100644
index 0000000..8a11493
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.event;
+
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventHookProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service used for publishing events and passing events to the hook providers.
+ */
+@Service
+public class EventService implements DisposableBean {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(EventService.class);
+
+    // Should only be a few events in the queue at a time, but setting a capacity just so it isn't unbounded
+    static final int EVENT_QUEUE_SIZE = 10_000;
+
+    private final BlockingQueue<Event> eventQueue;
+    private final ExecutorService scheduledExecutorService;
+    private final List<EventHookProvider> eventHookProviders;
+
+    @Autowired
+    public EventService(final List<EventHookProvider> eventHookProviders) {
+        this.eventQueue = new LinkedBlockingQueue<>(EVENT_QUEUE_SIZE);
+        this.scheduledExecutorService = Executors.newSingleThreadExecutor();
+        this.eventHookProviders = new ArrayList<>(eventHookProviders);
+    }
+
+    @PostConstruct
+    public void postConstruct() {
+        LOGGER.info("Starting event consumer...");
+
+        this.scheduledExecutorService.execute(() -> {
+            while (!Thread.interrupted()) {
+                try {
+                    final Event event = eventQueue.poll(1000, TimeUnit.MILLISECONDS);
+                    if (event == null) {
+                        continue;
+                    }
+
+                    // event was available so notify each provider, contain errors per-provider
+                    for(final EventHookProvider provider : eventHookProviders) {
+                        try {
+                            if (event.getEventType() == null
+                                    || (event.getEventType() != null && provider.shouldHandle(event.getEventType()))) {
+                                provider.handle(event);
+                            }
+                        } catch (Exception e) {
+                            LOGGER.error("Error handling event hook", e);
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.warn("Interrupted while polling event queue");
+                    return;
+                }
+            }
+        });
+
+        LOGGER.info("Event consumer started!");
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        LOGGER.info("Shutting down event consumer...");
+        this.scheduledExecutorService.shutdownNow();
+        LOGGER.info("Event consumer shutdown!");
+    }
+
+    public void publish(final Event event) {
+        if (event == null) {
+            return;
+        }
+
+        try {
+            event.validate();
+
+            final boolean queued = eventQueue.offer(event);
+            if (!queued) {
+                LOGGER.error("Unable to queue event because queue is full");
+            }
+        } catch (IllegalStateException e) {
+            LOGGER.error("Invalid event due to: " + e.getMessage(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java
new file mode 100644
index 0000000..4ad459d
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.event;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventField;
+import org.apache.nifi.registry.hook.EventFieldName;
+import org.apache.nifi.registry.hook.EventType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Standard implementation of Event.
+ */
+public class StandardEvent implements Event {
+
+    private final EventType eventType;
+
+    private final List<EventField> eventFields;
+
+    private StandardEvent(final Builder builder) {
+        this.eventType = builder.eventType;
+        this.eventFields = Collections.unmodifiableList(builder.eventFields == null
+                ? Collections.emptyList() : new ArrayList<>(builder.eventFields));
+        Validate.notNull(this.eventType);
+    }
+
+    @Override
+    public EventType getEventType() {
+        return eventType;
+    }
+
+    @Override
+    public List<EventField> getFields() {
+        return eventFields;
+    }
+
+    @Override
+    public EventField getField(final EventFieldName fieldName) {
+        if (fieldName == null) {
+            return null;
+        }
+
+        return eventFields.stream().filter(e -> fieldName.equals(e.getName())).findFirst().orElse(null);
+    }
+
+    @Override
+    public void validate() throws IllegalStateException {
+        final int numProvidedFields = eventFields.size();
+        final int numRequiredFields = eventType.getFieldNames().size();
+
+        if (numProvidedFields != numRequiredFields) {
+            throw new IllegalStateException(numRequiredFields + " fields were required, but only " + numProvidedFields + " were provided");
+        }
+
+        for (int i=0; i < numRequiredFields; i++) {
+            final EventFieldName required = eventType.getFieldNames().get(i);
+            final EventFieldName provided = eventFields.get(i).getName();
+            if (!required.equals(provided)) {
+                throw new IllegalStateException("Expected " + required.name() + ", but found " + provided.name());
+            }
+        }
+    }
+
+    /**
+     * Builder for Events.
+     */
+    public static class Builder {
+
+        private EventType eventType;
+        private List<EventField> eventFields = new ArrayList<>();
+
+        public Builder eventType(final EventType eventType) {
+            this.eventType = eventType;
+            return this;
+        }
+
+        public Builder addField(final EventFieldName name, final String value) {
+            this.eventFields.add(new StandardEventField(name, value));
+            return this;
+        }
+
+        public Builder addField(final EventField arg) {
+            if (arg != null) {
+                this.eventFields.add(arg);
+            }
+            return this;
+        }
+
+        public Builder addFields(final Collection<EventField> fields) {
+            if (fields != null) {
+                this.eventFields.addAll(fields);
+            }
+            return this;
+        }
+
+        public Builder clearFields() {
+            this.eventFields.clear();
+            return this;
+        }
+
+        public Event build() {
+            return new StandardEvent(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java
new file mode 100644
index 0000000..21266bb
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.event;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.hook.EventField;
+import org.apache.nifi.registry.hook.EventFieldName;
+
+/**
+ * Standard implementation of EventField.
+ */
+public class StandardEventField implements EventField {
+
+    private final EventFieldName name;
+
+    private final String value;
+
+    public StandardEventField(final EventFieldName name, final String value) {
+        this.name = name;
+        this.value = value;
+        Validate.notNull(this.name);
+        Validate.notNull(this.value);
+    }
+
+    @Override
+    public EventFieldName getName() {
+        return name;
+    }
+
+    @Override
+    public String getValue() {
+        return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java
new file mode 100644
index 0000000..8f9180c
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.exception;
+
+/**
+ *
+ */
+public class AdministrationException extends RuntimeException {
+
+    public AdministrationException(Throwable cause) {
+        super(cause);
+    }
+
+    public AdministrationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public AdministrationException(String message) {
+        super(message);
+    }
+
+    public AdministrationException() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
new file mode 100644
index 0000000..a83e9e2
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.exception;
+
+/**
+ * An exception that is thrown when an entity is not found.
+ */
+public class ResourceNotFoundException extends RuntimeException {
+
+    public ResourceNotFoundException(String message) {
+        super(message);
+    }
+
+    public ResourceNotFoundException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java
new file mode 100644
index 0000000..b24f950
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class ExtensionCloseable implements Closeable {
+    private final ClassLoader toSet;
+
+    private ExtensionCloseable(ClassLoader toSet) {
+        this.toSet = toSet;
+    }
+
+    public static ExtensionCloseable withComponentClassLoader(final ExtensionManager manager, final Class componentClass) {
+
+        final ClassLoader current = Thread.currentThread().getContextClassLoader();
+        final ExtensionCloseable closeable = new ExtensionCloseable(current);
+
+        ClassLoader componentClassLoader = manager.getExtensionClassLoader(componentClass.getName());
+        if (componentClassLoader == null) {
+            componentClassLoader = componentClass.getClassLoader();
+        }
+
+        Thread.currentThread().setContextClassLoader(componentClassLoader);
+        return closeable;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (toSet != null) {
+            Thread.currentThread().setContextClassLoader(toSet);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java
new file mode 100644
index 0000000..ca3259d
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.hook.EventHookProvider;
+import org.apache.nifi.registry.security.authentication.IdentityProvider;
+import org.apache.nifi.registry.security.authorization.AccessPolicyProvider;
+import org.apache.nifi.registry.security.authorization.Authorizer;
+import org.apache.nifi.registry.security.authorization.UserGroupProvider;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Component
+public class ExtensionManager {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(ExtensionManager.class);
+
+    private static final List<Class> EXTENSION_CLASSES;
+    static {
+        final List<Class> classes = new ArrayList<>();
+        classes.add(FlowPersistenceProvider.class);
+        classes.add(UserGroupProvider.class);
+        classes.add(AccessPolicyProvider.class);
+        classes.add(Authorizer.class);
+        classes.add(IdentityProvider.class);
+        classes.add(EventHookProvider.class);
+        EXTENSION_CLASSES = Collections.unmodifiableList(classes);
+    }
+
+    private final NiFiRegistryProperties properties;
+    private final Map<String,ExtensionClassLoader> classLoaderMap = new HashMap<>();
+    private final AtomicBoolean loaded = new AtomicBoolean(false);
+
+    @Autowired
+    public ExtensionManager(final NiFiRegistryProperties properties) {
+        this.properties = properties;
+    }
+
+    @PostConstruct
+    public synchronized void discoverExtensions() {
+        if (!loaded.get()) {
+            // get the list of class loaders to consider
+            final List<ExtensionClassLoader> classLoaders = getClassLoaders();
+
+            // for each class loader, attempt to load each extension class using the ServiceLoader
+            for (final ExtensionClassLoader extensionClassLoader : classLoaders) {
+                for (final Class extensionClass : EXTENSION_CLASSES) {
+                    loadExtensions(extensionClass, extensionClassLoader);
+                }
+            }
+
+            loaded.set(true);
+        }
+    }
+
+    public ClassLoader getExtensionClassLoader(final String canonicalClassName) {
+        if (StringUtils.isBlank(canonicalClassName)) {
+            throw new IllegalArgumentException("Class name can not be null");
+        }
+
+        return classLoaderMap.get(canonicalClassName);
+    }
+
+    /**
+     * Loads implementations of the given extension class from the given class loader.
+     *
+     * @param extensionClass the extension/service class
+     * @param extensionClassLoader the class loader to search
+     */
+    private void loadExtensions(final Class extensionClass, final ExtensionClassLoader extensionClassLoader) {
+        final ServiceLoader<?> serviceLoader = ServiceLoader.load(extensionClass, extensionClassLoader);
+        for (final Object o : serviceLoader) {
+            final String extensionClassName = o.getClass().getCanonicalName();
+            if (classLoaderMap.containsKey(extensionClassName)) {
+                final String currDir = extensionClassLoader.getRootDir();
+                final String existingDir = classLoaderMap.get(extensionClassName).getRootDir();
+                LOGGER.warn("Skipping {} from {} which was already found in {}", new Object[]{extensionClassName, currDir, existingDir});
+            } else {
+                classLoaderMap.put(o.getClass().getCanonicalName(), extensionClassLoader);
+            }
+        }
+    }
+
+    /**
+     * Gets all of the class loaders to consider for loading extensions.
+     *
+     * Includes the class loader of the web-app running the framework, plus a class loader for each additional
+     * directory specified in nifi-registry.properties.
+     *
+     * @return a list of extension class loaders
+     */
+    private List<ExtensionClassLoader> getClassLoaders() {
+        final List<ExtensionClassLoader> classLoaders = new ArrayList<>();
+
+        // start with the class loader that loaded ExtensionManager, should be WebAppClassLoader for API WAR
+        final ExtensionClassLoader frameworkClassLoader = new ExtensionClassLoader("web-api", new URL[0], this.getClass().getClassLoader());
+        classLoaders.add(frameworkClassLoader);
+
+        // we want to use the system class loader as the parent of the extension class loaders
+        ClassLoader systemClassLoader = FlowPersistenceProvider.class.getClassLoader();
+
+        // add a class loader for each extension dir
+        final Set<String> extensionDirs = properties.getExtensionsDirs();
+        for (final String dir : extensionDirs) {
+            if (!StringUtils.isBlank(dir)) {
+                final ExtensionClassLoader classLoader = createClassLoader(dir, systemClassLoader);
+                if (classLoader != null) {
+                    classLoaders.add(classLoader);
+                }
+            }
+        }
+
+        return classLoaders;
+    }
+
+    /**
+     * Creates a class loader for the given directory of resources.
+     *
+     * @param dir the dir of resources to add to the class loader
+     * @param parentClassLoader the parent class loader
+     * @return a class loader including all of the resources in the given dir, with the specified parent class loader
+     */
+    private ExtensionClassLoader createClassLoader(final String dir, final ClassLoader parentClassLoader) {
+        final File dirFile = new File(dir);
+
+        if (!dirFile.exists()) {
+            LOGGER.warn("Skipping extension directory that does not exist: " + dir);
+            return null;
+        }
+
+        if (!dirFile.canRead()) {
+            LOGGER.warn("Skipping extension directory that can not be read: " + dir);
+            return null;
+        }
+
+        final List<URL> resources = new LinkedList<>();
+
+        try {
+            resources.add(dirFile.toURI().toURL());
+        } catch (final MalformedURLException mfe) {
+            LOGGER.warn("Unable to add {} to classpath due to {}",
+                    new Object[]{ dirFile.getAbsolutePath(), mfe.getMessage()}, mfe);
+        }
+
+        if (dirFile.isDirectory()) {
+            final File[] files = dirFile.listFiles();
+            if (files != null) {
+                for (final File resource : files) {
+                    if (resource.isDirectory()) {
+                        LOGGER.warn("Recursive directories are not supported, skipping " + resource.getAbsolutePath());
+                    } else {
+                        try {
+                            resources.add(resource.toURI().toURL());
+                        } catch (final MalformedURLException mfe) {
+                            LOGGER.warn("Unable to add {} to classpath due to {}",
+                                    new Object[]{ resource.getAbsolutePath(), mfe.getMessage()}, mfe);
+                        }
+                    }
+                }
+            }
+        }
+
+        final URL[] urls = resources.toArray(new URL[resources.size()]);
+        return new ExtensionClassLoader(dir, urls, parentClassLoader);
+    }
+
+    /**
+     * Extend URLClassLoader to keep track of the root directory.
+     */
+    private static class ExtensionClassLoader extends URLClassLoader {
+
+        private final String rootDir;
+
+        public ExtensionClassLoader(final String rootDir, final URL[] urls, final ClassLoader parent) {
+            super(urls, parent);
+            this.rootDir = rootDir;
+        }
+
+        public String getRootDir() {
+            return rootDir;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java
new file mode 100644
index 0000000..a3f3276
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.util.List;
+
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.hook.EventHookProvider;
+
+/**
+ * A factory for obtaining the configured providers.
+ */
+public interface ProviderFactory {
+
+    /**
+     * Initialize the factory.
+     *
+     * @throws ProviderFactoryException if an error occurs during initialization
+     */
+    void initialize() throws ProviderFactoryException;
+
+    /**
+     * @return the configured FlowPersistenceProvider
+     */
+    FlowPersistenceProvider getFlowPersistenceProvider();
+
+    /**
+     * @return the configured FlowHookProviders
+     */
+    List<EventHookProvider> getEventHookProviders();
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java
new file mode 100644
index 0000000..3842b9e
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+/**
+ * An error that occurs while initializing a ProviderFactory.
+ */
+public class ProviderFactoryException extends RuntimeException {
+
+    public ProviderFactoryException() {
+    }
+
+    public ProviderFactoryException(String message) {
+        super(message);
+    }
+
+    public ProviderFactoryException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ProviderFactoryException(Throwable cause) {
+        super(cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
new file mode 100644
index 0000000..8f186fd
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Standard configuration context to be passed to onConfigured method of Providers.
+ */
+public class StandardProviderConfigurationContext implements ProviderConfigurationContext {
+
+    private final Map<String,String> properties;
+
+    public StandardProviderConfigurationContext(final Map<String, String> properties) {
+        this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return properties;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
new file mode 100644
index 0000000..65ba914
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.extension.ExtensionManager;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.hook.EventHookProvider;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.apache.nifi.registry.provider.generated.Property;
+import org.apache.nifi.registry.provider.generated.Providers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.xml.sax.SAXException;
+
+import javax.annotation.PostConstruct;
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Standard implementation of ProviderFactory.
+ */
+@Configuration
+public class StandardProviderFactory implements ProviderFactory {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(StandardProviderFactory.class);
+
+    private static final String PROVIDERS_XSD = "/providers.xsd";
+    private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.provider.generated";
+    private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+    /**
+     * Load the JAXBContext.
+     */
+    private static JAXBContext initializeJaxbContext() {
+        try {
+            return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.", e);
+        }
+    }
+
+    private final NiFiRegistryProperties properties;
+    private final ExtensionManager extensionManager;
+    private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null);
+
+    private FlowPersistenceProvider flowPersistenceProvider;
+    private List<EventHookProvider> eventHookProviders;
+
+    @Autowired
+    public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) {
+        this.properties = properties;
+        this.extensionManager = extensionManager;
+
+        if (this.properties == null) {
+            throw new IllegalStateException("NiFiRegistryProperties cannot be null");
+        }
+
+        if (this.extensionManager == null) {
+            throw new IllegalStateException("ExtensionManager cannot be null");
+        }
+    }
+
+    @PostConstruct
+    @Override
+    public synchronized void initialize() throws ProviderFactoryException {
+        if (providersHolder.get() == null) {
+            final File providersConfigFile = properties.getProvidersConfigurationFile();
+            if (providersConfigFile.exists()) {
+                try {
+                    // find the schema
+                    final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+                    final Schema schema = schemaFactory.newSchema(StandardProviderFactory.class.getResource(PROVIDERS_XSD));
+
+                    // attempt to unmarshal
+                    final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+                    unmarshaller.setSchema(schema);
+
+                    // set the holder for later use
+                    final JAXBElement<Providers> element = unmarshaller.unmarshal(new StreamSource(providersConfigFile), Providers.class);
+                    providersHolder.set(element.getValue());
+                } catch (SAXException | JAXBException e) {
+                    throw new ProviderFactoryException("Unable to load the providers configuration file at: " + providersConfigFile.getAbsolutePath(), e);
+                }
+            } else {
+                throw new ProviderFactoryException("Unable to find the providers configuration file at " + providersConfigFile.getAbsolutePath());
+            }
+        }
+    }
+
+    @Bean
+    @Override
+    public synchronized FlowPersistenceProvider getFlowPersistenceProvider() {
+        if (flowPersistenceProvider == null) {
+            if (providersHolder.get() == null) {
+                throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
+            }
+
+            final Providers providers = providersHolder.get();
+            final org.apache.nifi.registry.provider.generated.Provider jaxbFlowProvider = providers.getFlowPersistenceProvider();
+            final String flowProviderClassName = jaxbFlowProvider.getClazz();
+
+            try {
+                final ClassLoader classLoader = extensionManager.getExtensionClassLoader(flowProviderClassName);
+                if (classLoader == null) {
+                    throw new IllegalStateException("Extension not found in any of the configured class loaders: " + flowProviderClassName);
+                }
+
+                final Class<?> rawFlowProviderClass = Class.forName(flowProviderClassName, true, classLoader);
+                final Class<? extends FlowPersistenceProvider> flowProviderClass = rawFlowProviderClass.asSubclass(FlowPersistenceProvider.class);
+
+                final Constructor constructor = flowProviderClass.getConstructor();
+                flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance();
+
+                LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+            } catch (Exception e) {
+                throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e);
+            }
+
+            final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty());
+            flowPersistenceProvider.onConfigured(configurationContext);
+            LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+        }
+
+        return flowPersistenceProvider;
+    }
+
+    @Bean
+    @Override
+    public List<EventHookProvider> getEventHookProviders() {
+        if (eventHookProviders == null) {
+            eventHookProviders = new ArrayList<>();
+
+            if (providersHolder.get() == null) {
+                throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
+            }
+
+            final Providers providers = providersHolder.get();
+            final List<org.apache.nifi.registry.provider.generated.Provider> jaxbHookProvider = providers.getEventHookProvider();
+
+            if(jaxbHookProvider == null || jaxbHookProvider.isEmpty()) {
+                // no hook provided
+                return eventHookProviders;
+            }
+
+            for (org.apache.nifi.registry.provider.generated.Provider hookProvider : jaxbHookProvider) {
+
+                final String hookProviderClassName = hookProvider.getClazz();
+                EventHookProvider hook;
+
+                try {
+                    final ClassLoader classLoader = extensionManager.getExtensionClassLoader(hookProviderClassName);
+                    if (classLoader == null) {
+                        throw new IllegalStateException("Extension not found in any of the configured class loaders: " + hookProviderClassName);
+                    }
+
+                    final Class<?> rawHookProviderClass = Class.forName(hookProviderClassName, true, classLoader);
+                    final Class<? extends EventHookProvider> hookProviderClass = rawHookProviderClass.asSubclass(EventHookProvider.class);
+
+                    final Constructor constructor = hookProviderClass.getConstructor();
+                    hook = (EventHookProvider) constructor.newInstance();
+
+                    LOGGER.info("Instantiated EventHookProvider with class name {}", new Object[] {hookProviderClassName});
+                } catch (Exception e) {
+                    throw new ProviderFactoryException("Error creating EventHookProvider with class name: " + hookProviderClassName, e);
+                }
+
+                final ProviderConfigurationContext configurationContext = createConfigurationContext(hookProvider.getProperty());
+                hook.onConfigured(configurationContext);
+                eventHookProviders.add(hook);
+                LOGGER.info("Configured EventHookProvider with class name {}", new Object[] {hookProviderClassName});
+            }
+        }
+
+        return eventHookProviders;
+    }
+
+    private ProviderConfigurationContext createConfigurationContext(final List<Property> configProperties) {
+        final Map<String,String> properties = new HashMap<>();
+
+        if (configProperties != null) {
+            configProperties.stream().forEach(p -> properties.put(p.getName(), p.getValue()));
+        }
+
+        return new StandardProviderConfigurationContext(properties);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
new file mode 100644
index 0000000..071656d
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * A FlowPersistenceProvider that uses the local filesystem for storage.
+ */
+public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvider {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(FileSystemFlowPersistenceProvider.class);
+
+    static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
+
+    static final String SNAPSHOT_EXTENSION = ".snapshot";
+
+    private File flowStorageDir;
+
+    @Override
+    public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        final Map<String,String> props = configurationContext.getProperties();
+        if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) {
+            throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided");
+        }
+
+        final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP);
+        if (StringUtils.isBlank(flowStorageDirValue)) {
+            throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank");
+        }
+
+        try {
+            flowStorageDir = new File(flowStorageDirValue);
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir);
+            LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()});
+        } catch (IOException e) {
+            throw new ProviderCreationException(e);
+        }
+    }
+
+    @Override
+    public synchronized void saveFlowContent(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
+        final File bucketDir = new File(flowStorageDir, context.getBucketId());
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e);
+        }
+
+        final File flowDir = new File(bucketDir, context.getFlowId());
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing flow directory at " + flowDir.getAbsolutePath(), e);
+        }
+
+        final String versionString = String.valueOf(context.getVersion());
+        final File versionDir = new File(flowDir, versionString);
+        try {
+            FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e);
+        }
+
+        final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION);
+        if (versionFile.exists()) {
+            throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString);
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()});
+        }
+
+        try (final OutputStream out = new FileOutputStream(versionFile)) {
+            out.write(content);
+            out.flush();
+        } catch (Exception e) {
+            throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public synchronized byte[] getFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+        final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()});
+        }
+
+        if (!snapshotFile.exists()) {
+            return null;
+        }
+
+        try (final InputStream in = new FileInputStream(snapshotFile)){
+            return IOUtils.toByteArray(in);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error reading snapshot file: " + snapshotFile.getAbsolutePath(), e);
+        }
+    }
+
+    @Override
+    public synchronized void deleteAllFlowContent(final String bucketId, final String flowId) throws FlowPersistenceException {
+        final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId);
+        if (!flowDir.exists()) {
+            LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()});
+            return;
+        }
+
+        // delete everything under the flow directory
+        try {
+            org.apache.commons.io.FileUtils.cleanDirectory(flowDir);
+        } catch (IOException e) {
+            throw new FlowPersistenceException("Error deleting snapshots at " + flowDir.getAbsolutePath(), e);
+        }
+
+        // delete the directory for the flow
+        final boolean flowDirDeleted = flowDir.delete();
+        if (!flowDirDeleted) {
+            LOGGER.error("Unable to delete flow directory: " + flowDir.getAbsolutePath());
+        }
+
+        // delete the directory for the bucket if there is nothing left
+        final File bucketDir = new File(flowStorageDir, bucketId);
+        final File[] bucketFiles = bucketDir.listFiles();
+        if (bucketFiles.length == 0) {
+            final boolean deletedBucket = bucketDir.delete();
+            if (!deletedBucket) {
+                LOGGER.error("Unable to delete bucket directory: " + flowDir.getAbsolutePath());
+            }
+        }
+    }
+
+    @Override
+    public synchronized void deleteFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+        final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+        if (!snapshotFile.exists()) {
+            LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()});
+            return;
+        }
+
+        final boolean deleted = snapshotFile.delete();
+        if (!deleted) {
+            throw new FlowPersistenceException("Unable to delete snapshot at " + snapshotFile.getAbsolutePath());
+        }
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()});
+        }
+    }
+
+    protected File getSnapshotFile(final String bucketId, final String flowId, final int version) {
+        final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION;
+        return new File(flowStorageDir, snapshotFilename);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
new file mode 100644
index 0000000..1728513
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+
+/**
+ * Standard implementation of FlowSnapshotContext.
+ */
+public class StandardFlowSnapshotContext implements FlowSnapshotContext {
+
+    private final String bucketId;
+    private final String bucketName;
+    private final String flowId;
+    private final String flowName;
+    private final int version;
+    private final String comments;
+    private final String author;
+    private final long snapshotTimestamp;
+
+    private StandardFlowSnapshotContext(final Builder builder) {
+        this.bucketId = builder.bucketId;
+        this.bucketName = builder.bucketName;
+        this.flowId = builder.flowId;
+        this.flowName = builder.flowName;
+        this.version = builder.version;
+        this.comments = builder.comments;
+        this.author = builder.author;
+        this.snapshotTimestamp = builder.snapshotTimestamp;
+
+        Validate.notBlank(bucketId);
+        Validate.notBlank(bucketName);
+        Validate.notBlank(flowId);
+        Validate.notBlank(flowName);
+        Validate.isTrue(version > 0);
+        Validate.isTrue(snapshotTimestamp > 0);
+    }
+
+    @Override
+    public String getBucketId() {
+        return bucketId;
+    }
+
+    @Override
+    public String getBucketName() {
+        return bucketName;
+    }
+
+    @Override
+    public String getFlowId() {
+        return flowId;
+    }
+
+    @Override
+    public String getFlowName() {
+        return flowName;
+    }
+
+    @Override
+    public int getVersion() {
+        return version;
+    }
+
+    @Override
+    public String getComments() {
+        return comments;
+    }
+
+    @Override
+    public long getSnapshotTimestamp() {
+        return snapshotTimestamp;
+    }
+
+    @Override
+    public String getAuthor() {
+        return author;
+    }
+
+    /**
+     * Builder for creating instances of StandardFlowSnapshotContext.
+     */
+    public static class Builder {
+
+        private String bucketId;
+        private String bucketName;
+        private String flowId;
+        private String flowName;
+        private int version;
+        private String comments;
+        private String author;
+        private long snapshotTimestamp;
+
+        public Builder() {
+
+        }
+
+        public Builder(final Bucket bucket, final VersionedFlow versionedFlow, final VersionedFlowSnapshotMetadata snapshotMetadata) {
+            bucketId(bucket.getIdentifier());
+            bucketName(bucket.getName());
+            flowId(snapshotMetadata.getFlowIdentifier());
+            flowName(versionedFlow.getName());
+            version(snapshotMetadata.getVersion());
+            comments(snapshotMetadata.getComments());
+            author(snapshotMetadata.getAuthor());
+            snapshotTimestamp(snapshotMetadata.getTimestamp());
+        }
+
+        public Builder bucketId(final String bucketId) {
+            this.bucketId = bucketId;
+            return this;
+        }
+
+        public Builder bucketName(final String bucketName) {
+            this.bucketName = bucketName;
+            return this;
+        }
+
+        public Builder flowId(final String flowId) {
+            this.flowId = flowId;
+            return this;
+        }
+
+        public Builder flowName(final String flowName) {
+            this.flowName = flowName;
+            return this;
+        }
+
+        public Builder version(final int version) {
+            this.version = version;
+            return this;
+        }
+
+        public Builder comments(final String comments) {
+            this.comments = comments;
+            return this;
+        }
+
+        public Builder author(final String author) {
+            this.author = author;
+            return this;
+        }
+
+        public Builder snapshotTimestamp(final long snapshotTimestamp) {
+            this.snapshotTimestamp = snapshotTimestamp;
+            return this;
+        }
+
+        public StandardFlowSnapshotContext build() {
+            return new StandardFlowSnapshotContext(this);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
new file mode 100644
index 0000000..3595d84
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+class Bucket {
+    private final String bucketId;
+    private String bucketDirName;
+
+    /**
+     * Flow ID to Flow.
+     */
+    private Map<String, Flow> flows = new HashMap<>();
+
+    public Bucket(String bucketId) {
+        this.bucketId = bucketId;
+    }
+
+    public String getBucketId() {
+        return bucketId;
+    }
+
+    /**
+     * Returns the directory name of this bucket.
+     * @return can be different from original bucket name if it contained sanitized character.
+     */
+    public String getBucketDirName() {
+        return bucketDirName;
+    }
+
+    /**
+     * Set the name of bucket directory.
+     * @param bucketDirName The directory name must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so.
+     */
+    public void setBucketDirName(String bucketDirName) {
+        this.bucketDirName = bucketDirName;
+    }
+
+    public Flow getFlowOrCreate(String flowId) {
+        return this.flows.computeIfAbsent(flowId, k -> new Flow(flowId));
+    }
+
+    public Optional<Flow> getFlow(String flowId) {
+        return Optional.ofNullable(flows.get(flowId));
+    }
+
+    public void removeFlow(String flowId) {
+        flows.remove(flowId);
+    }
+
+    public boolean isEmpty() {
+        return flows.isEmpty();
+    }
+
+    /**
+     * Serialize the latest version of this Bucket meta data.
+     * @return serialized bucket
+     */
+    Map<String, Object> serialize() {
+        final Map<String, Object> map = new HashMap<>();
+
+        map.put(GitFlowMetaData.LAYOUT_VERSION, GitFlowMetaData.CURRENT_LAYOUT_VERSION);
+        map.put(GitFlowMetaData.BUCKET_ID, bucketId);
+        map.put(GitFlowMetaData.FLOWS,
+                flows.keySet().stream().collect(Collectors.toMap(k -> k, k -> flows.get(k).serialize())));
+
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
new file mode 100644
index 0000000..1bc7f3f
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+class Flow {
+    /**
+     * The ID of a Flow. It never changes.
+     */
+    private final String flowId;
+
+    /**
+     * A version to a Flow pointer.
+     */
+    private final Map<Integer, FlowPointer> versions = new HashMap<>();
+
+    public Flow(String flowId) {
+        this.flowId = flowId;
+    }
+
+    public boolean hasVersion(int version) {
+        return versions.containsKey(version);
+    }
+
+    public FlowPointer getFlowVersion(int version) {
+        return versions.get(version);
+    }
+
+    public void putVersion(int version, FlowPointer pointer) {
+        versions.put(version, pointer);
+    }
+
+    public static class FlowPointer {
+        private String gitRev;
+        private String objectId;
+        private final String fileName;
+
+        /**
+         * Create new FlowPointer instance.
+         * @param fileName The filename must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so.
+         */
+        public FlowPointer(String fileName) {
+            this.fileName = fileName;
+        }
+
+        public void setGitRev(String gitRev) {
+            this.gitRev = gitRev;
+        }
+
+        public String getGitRev() {
+            return gitRev;
+        }
+
+        public String getFileName() {
+            return fileName;
+        }
+
+        public String getObjectId() {
+            return objectId;
+        }
+
+        public void setObjectId(String objectId) {
+            this.objectId = objectId;
+        }
+    }
+
+    /**
+     * Serialize the latest version of this Flow meta data.
+     * @return serialized flow
+     */
+    Map<String, Object> serialize() {
+        final Map<String, Object> map = new HashMap<>();
+        final Optional<Integer> latestVerOpt = getLatestVersion();
+        if (!latestVerOpt.isPresent()) {
+            throw new IllegalStateException("Flow version is not added yet, can not be serialized.");
+        }
+        final Integer latestVer = latestVerOpt.get();
+        map.put(GitFlowMetaData.VER, latestVer);
+        map.put(GitFlowMetaData.FILE, versions.get(latestVer).fileName);
+
+        return map;
+    }
+
+    Optional<Integer> getLatestVersion() {
+        return versions.keySet().stream().reduce(Integer::max);
+    }
+
+}