You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2018/09/22 02:11:38 UTC
[39/51] [partial] nifi-registry git commit: NIFIREG-201 Refactoring
project structure to better isolate extensions
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java
new file mode 100644
index 0000000..ec6b9a5
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/FlowSnapshotEntityV1.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.migration;
+
+import java.util.Date;
+import java.util.Objects;
+
+/**
+ * FlowSnapshot DB entity from the original database schema in 0.1.0, used for migration purposes.
+ */
+public class FlowSnapshotEntityV1 {
+
+ private String flowId;
+
+ private Integer version;
+
+ private Date created;
+
+ private String createdBy;
+
+ private String comments;
+
+ public String getFlowId() {
+ return flowId;
+ }
+
+ public void setFlowId(String flowId) {
+ this.flowId = flowId;
+ }
+
+ public Integer getVersion() {
+ return version;
+ }
+
+ public void setVersion(Integer version) {
+ this.version = version;
+ }
+
+ public Date getCreated() {
+ return created;
+ }
+
+ public void setCreated(Date created) {
+ this.created = created;
+ }
+
+ public String getCreatedBy() {
+ return createdBy;
+ }
+
+ public void setCreatedBy(String createdBy) {
+ this.createdBy = createdBy;
+ }
+
+ public String getComments() {
+ return comments;
+ }
+
+ public void setComments(String comments) {
+ this.comments = comments;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.flowId, this.version);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+
+ if (!(obj instanceof FlowSnapshotEntityV1)) {
+ return false;
+ }
+
+ final FlowSnapshotEntityV1 other = (FlowSnapshotEntityV1) obj;
+ return Objects.equals(this.flowId, other.flowId) && Objects.equals(this.version, other.version);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java
new file mode 100644
index 0000000..72d3acf
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDataSourceFactory.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.migration;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.h2.jdbcx.JdbcConnectionPool;
+
+import javax.sql.DataSource;
+import java.io.File;
+
+/**
+ * NOTE: This DataSource factory was used in the original 0.1.0 release and remains to migrate data from the old database.
+ * This class is intentionally not a Spring bean, and will be used manually in the custom Flyway migration.
+ */
+public class LegacyDataSourceFactory {
+
+ private static final String DB_USERNAME_PASSWORD = "nifireg";
+ private static final int MAX_CONNECTIONS = 5;
+
+ // database file name
+ private static final String DATABASE_FILE_NAME = "nifi-registry";
+
+ private final NiFiRegistryProperties properties;
+
+ private JdbcConnectionPool connectionPool;
+
+ public LegacyDataSourceFactory(final NiFiRegistryProperties properties) {
+ this.properties = properties;
+ }
+
+ public DataSource getDataSource() {
+ if (connectionPool == null) {
+ final String databaseUrl = getDatabaseUrl(properties);
+ connectionPool = JdbcConnectionPool.create(databaseUrl, DB_USERNAME_PASSWORD, DB_USERNAME_PASSWORD);
+ connectionPool.setMaxConnections(MAX_CONNECTIONS);
+ }
+
+ return connectionPool;
+ }
+
+ public static String getDatabaseUrl(final NiFiRegistryProperties properties) {
+ // locate the repository directory
+ final String repositoryDirectoryPath = properties.getLegacyDatabaseDirectory();
+
+ // ensure the repository directory is specified
+ if (repositoryDirectoryPath == null) {
+ throw new NullPointerException("Database directory must be specified.");
+ }
+
+ // create a handle to the repository directory
+ final File repositoryDirectory = new File(repositoryDirectoryPath);
+
+ // get a handle to the database file
+ final File databaseFile = new File(repositoryDirectory, DATABASE_FILE_NAME);
+
+ // format the database url
+ String databaseUrl = "jdbc:h2:" + databaseFile + ";AUTOCOMMIT=OFF;DB_CLOSE_ON_EXIT=FALSE;LOCK_MODE=3";
+ String databaseUrlAppend = properties.getLegacyDatabaseUrlAppend();
+ if (StringUtils.isNotBlank(databaseUrlAppend)) {
+ databaseUrl += databaseUrlAppend;
+ }
+
+ return databaseUrl;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java
new file mode 100644
index 0000000..533fadd
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyDatabaseService.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.migration;
+
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import javax.sql.DataSource;
+import java.util.List;
+
+/**
+ * Service used to load data from original database used in the 0.1.0 release.
+ */
+public class LegacyDatabaseService {
+
+ private final JdbcTemplate jdbcTemplate;
+
+ public LegacyDatabaseService(final DataSource dataSource) {
+ this.jdbcTemplate = new JdbcTemplate(dataSource);
+ }
+
+ public List<BucketEntityV1> getAllBuckets() {
+ final String sql = "SELECT * FROM bucket ORDER BY name ASC";
+
+ return jdbcTemplate.query(sql, (rs, i) -> {
+ final BucketEntityV1 b = new BucketEntityV1();
+ b.setId(rs.getString("ID"));
+ b.setName(rs.getString("NAME"));
+ b.setDescription(rs.getString("DESCRIPTION"));
+ b.setCreated(rs.getTimestamp("CREATED"));
+ return b;
+ });
+ }
+
+ public List<FlowEntityV1> getAllFlows() {
+ final String sql = "SELECT * FROM flow f, bucket_item item WHERE item.id = f.id";
+
+ return jdbcTemplate.query(sql, (rs, i) -> {
+ final FlowEntityV1 flowEntity = new FlowEntityV1();
+ flowEntity.setId(rs.getString("ID"));
+ flowEntity.setName(rs.getString("NAME"));
+ flowEntity.setDescription(rs.getString("DESCRIPTION"));
+ flowEntity.setCreated(rs.getTimestamp("CREATED"));
+ flowEntity.setModified(rs.getTimestamp("MODIFIED"));
+ flowEntity.setBucketId(rs.getString("BUCKET_ID"));
+ return flowEntity;
+ });
+ }
+
+ public List<FlowSnapshotEntityV1> getAllFlowSnapshots() {
+ final String sql = "SELECT * FROM flow_snapshot fs";
+
+ return jdbcTemplate.query(sql, (rs, i) -> {
+ final FlowSnapshotEntityV1 fs = new FlowSnapshotEntityV1();
+ fs.setFlowId(rs.getString("FLOW_ID"));
+ fs.setVersion(rs.getInt("VERSION"));
+ fs.setCreated(rs.getTimestamp("CREATED"));
+ fs.setCreatedBy(rs.getString("CREATED_BY"));
+ fs.setComments(rs.getString("COMMENTS"));
+ return fs;
+ });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java
new file mode 100644
index 0000000..bf82aae
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/db/migration/LegacyEntityMapper.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.db.migration;
+
+import org.apache.nifi.registry.db.entity.BucketEntity;
+import org.apache.nifi.registry.db.entity.BucketItemEntityType;
+import org.apache.nifi.registry.db.entity.FlowEntity;
+import org.apache.nifi.registry.db.entity.FlowSnapshotEntity;
+
+/**
+ * Utility methods to map legacy DB entities to current DB entities.
+ *
+ * The initial implementations of these mappings will be almost a direct translation, but if future changes are made
+ * to the original tables these methods will handle the translation from old entity to new entity.
+ */
+public class LegacyEntityMapper {
+
+ public static BucketEntity createBucketEntity(final BucketEntityV1 bucketEntityV1) {
+ final BucketEntity bucketEntity = new BucketEntity();
+ bucketEntity.setId(bucketEntityV1.getId());
+ bucketEntity.setName(bucketEntityV1.getName());
+ bucketEntity.setDescription(bucketEntityV1.getDescription());
+ bucketEntity.setCreated(bucketEntityV1.getCreated());
+ return bucketEntity;
+ }
+
+ public static FlowEntity createFlowEntity(final FlowEntityV1 flowEntityV1) {
+ final FlowEntity flowEntity = new FlowEntity();
+ flowEntity.setId(flowEntityV1.getId());
+ flowEntity.setName(flowEntityV1.getName());
+ flowEntity.setDescription(flowEntityV1.getDescription());
+ flowEntity.setCreated(flowEntityV1.getCreated());
+ flowEntity.setModified(flowEntityV1.getModified());
+ flowEntity.setBucketId(flowEntityV1.getBucketId());
+ flowEntity.setType(BucketItemEntityType.FLOW);
+ return flowEntity;
+ }
+
+ public static FlowSnapshotEntity createFlowSnapshotEntity(final FlowSnapshotEntityV1 flowSnapshotEntityV1) {
+ final FlowSnapshotEntity flowSnapshotEntity = new FlowSnapshotEntity();
+ flowSnapshotEntity.setFlowId(flowSnapshotEntityV1.getFlowId());
+ flowSnapshotEntity.setVersion(flowSnapshotEntityV1.getVersion());
+ flowSnapshotEntity.setComments(flowSnapshotEntityV1.getComments());
+ flowSnapshotEntity.setCreated(flowSnapshotEntityV1.getCreated());
+ flowSnapshotEntity.setCreatedBy(flowSnapshotEntityV1.getCreatedBy());
+ return flowSnapshotEntity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java
new file mode 100644
index 0000000..b837d6d
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventFactory.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.event;
+
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventFieldName;
+import org.apache.nifi.registry.hook.EventType;
+import org.apache.nifi.registry.security.authorization.user.NiFiUserUtils;
+
+/**
+ * Factory to create Events from domain objects.
+ */
+public class EventFactory {
+
+ public static Event bucketCreated(final Bucket bucket) {
+ return new StandardEvent.Builder()
+ .eventType(EventType.CREATE_BUCKET)
+ .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
+ .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+ .build();
+ }
+
+ public static Event bucketUpdated(final Bucket bucket) {
+ return new StandardEvent.Builder()
+ .eventType(EventType.UPDATE_BUCKET)
+ .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
+ .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+ .build();
+ }
+
+ public static Event bucketDeleted(final Bucket bucket) {
+ return new StandardEvent.Builder()
+ .eventType(EventType.DELETE_BUCKET)
+ .addField(EventFieldName.BUCKET_ID, bucket.getIdentifier())
+ .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+ .build();
+ }
+
+ public static Event flowCreated(final VersionedFlow versionedFlow) {
+ return new StandardEvent.Builder()
+ .eventType(EventType.CREATE_FLOW)
+ .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
+ .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
+ .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+ .build();
+ }
+
+ public static Event flowUpdated(final VersionedFlow versionedFlow) {
+ return new StandardEvent.Builder()
+ .eventType(EventType.UPDATE_FLOW)
+ .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
+ .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
+ .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+ .build();
+ }
+
+ public static Event flowDeleted(final VersionedFlow versionedFlow) {
+ return new StandardEvent.Builder()
+ .eventType(EventType.DELETE_FLOW)
+ .addField(EventFieldName.BUCKET_ID, versionedFlow.getBucketIdentifier())
+ .addField(EventFieldName.FLOW_ID, versionedFlow.getIdentifier())
+ .addField(EventFieldName.USER, NiFiUserUtils.getNiFiUserIdentity())
+ .build();
+ }
+
+ public static Event flowVersionCreated(final VersionedFlowSnapshot versionedFlowSnapshot) {
+ final String versionComments = versionedFlowSnapshot.getSnapshotMetadata().getComments() == null
+ ? "" : versionedFlowSnapshot.getSnapshotMetadata().getComments();
+
+ return new StandardEvent.Builder()
+ .eventType(EventType.CREATE_FLOW_VERSION)
+ .addField(EventFieldName.BUCKET_ID, versionedFlowSnapshot.getSnapshotMetadata().getBucketIdentifier())
+ .addField(EventFieldName.FLOW_ID, versionedFlowSnapshot.getSnapshotMetadata().getFlowIdentifier())
+ .addField(EventFieldName.VERSION, String.valueOf(versionedFlowSnapshot.getSnapshotMetadata().getVersion()))
+ .addField(EventFieldName.USER, versionedFlowSnapshot.getSnapshotMetadata().getAuthor())
+ .addField(EventFieldName.COMMENT, versionComments)
+ .build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java
new file mode 100644
index 0000000..8a11493
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/EventService.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.event;
+
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventHookProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service used for publishing events and passing events to the hook providers.
+ */
+@Service
+public class EventService implements DisposableBean {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(EventService.class);
+
+ // Should only be a few events in the queue at a time, but setting a capacity just so it isn't unbounded
+ static final int EVENT_QUEUE_SIZE = 10_000;
+
+ private final BlockingQueue<Event> eventQueue;
+ private final ExecutorService scheduledExecutorService;
+ private final List<EventHookProvider> eventHookProviders;
+
+ @Autowired
+ public EventService(final List<EventHookProvider> eventHookProviders) {
+ this.eventQueue = new LinkedBlockingQueue<>(EVENT_QUEUE_SIZE);
+ this.scheduledExecutorService = Executors.newSingleThreadExecutor();
+ this.eventHookProviders = new ArrayList<>(eventHookProviders);
+ }
+
+ @PostConstruct
+ public void postConstruct() {
+ LOGGER.info("Starting event consumer...");
+
+ this.scheduledExecutorService.execute(() -> {
+ while (!Thread.interrupted()) {
+ try {
+ final Event event = eventQueue.poll(1000, TimeUnit.MILLISECONDS);
+ if (event == null) {
+ continue;
+ }
+
+ // event was available so notify each provider, contain errors per-provider
+ for(final EventHookProvider provider : eventHookProviders) {
+ try {
+ if (event.getEventType() == null
+ || (event.getEventType() != null && provider.shouldHandle(event.getEventType()))) {
+ provider.handle(event);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error handling event hook", e);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while polling event queue");
+ return;
+ }
+ }
+ });
+
+ LOGGER.info("Event consumer started!");
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ LOGGER.info("Shutting down event consumer...");
+ this.scheduledExecutorService.shutdownNow();
+ LOGGER.info("Event consumer shutdown!");
+ }
+
+ public void publish(final Event event) {
+ if (event == null) {
+ return;
+ }
+
+ try {
+ event.validate();
+
+ final boolean queued = eventQueue.offer(event);
+ if (!queued) {
+ LOGGER.error("Unable to queue event because queue is full");
+ }
+ } catch (IllegalStateException e) {
+ LOGGER.error("Invalid event due to: " + e.getMessage(), e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java
new file mode 100644
index 0000000..4ad459d
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEvent.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.event;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventField;
+import org.apache.nifi.registry.hook.EventFieldName;
+import org.apache.nifi.registry.hook.EventType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Standard implementation of Event.
+ */
+public class StandardEvent implements Event {
+
+ private final EventType eventType;
+
+ private final List<EventField> eventFields;
+
+ private StandardEvent(final Builder builder) {
+ this.eventType = builder.eventType;
+ this.eventFields = Collections.unmodifiableList(builder.eventFields == null
+ ? Collections.emptyList() : new ArrayList<>(builder.eventFields));
+ Validate.notNull(this.eventType);
+ }
+
+ @Override
+ public EventType getEventType() {
+ return eventType;
+ }
+
+ @Override
+ public List<EventField> getFields() {
+ return eventFields;
+ }
+
+ @Override
+ public EventField getField(final EventFieldName fieldName) {
+ if (fieldName == null) {
+ return null;
+ }
+
+ return eventFields.stream().filter(e -> fieldName.equals(e.getName())).findFirst().orElse(null);
+ }
+
+ @Override
+ public void validate() throws IllegalStateException {
+ final int numProvidedFields = eventFields.size();
+ final int numRequiredFields = eventType.getFieldNames().size();
+
+ if (numProvidedFields != numRequiredFields) {
+ throw new IllegalStateException(numRequiredFields + " fields were required, but only " + numProvidedFields + " were provided");
+ }
+
+ for (int i=0; i < numRequiredFields; i++) {
+ final EventFieldName required = eventType.getFieldNames().get(i);
+ final EventFieldName provided = eventFields.get(i).getName();
+ if (!required.equals(provided)) {
+ throw new IllegalStateException("Expected " + required.name() + ", but found " + provided.name());
+ }
+ }
+ }
+
+ /**
+ * Builder for Events.
+ */
+ public static class Builder {
+
+ private EventType eventType;
+ private List<EventField> eventFields = new ArrayList<>();
+
+ public Builder eventType(final EventType eventType) {
+ this.eventType = eventType;
+ return this;
+ }
+
+ public Builder addField(final EventFieldName name, final String value) {
+ this.eventFields.add(new StandardEventField(name, value));
+ return this;
+ }
+
+ public Builder addField(final EventField arg) {
+ if (arg != null) {
+ this.eventFields.add(arg);
+ }
+ return this;
+ }
+
+ public Builder addFields(final Collection<EventField> fields) {
+ if (fields != null) {
+ this.eventFields.addAll(fields);
+ }
+ return this;
+ }
+
+ public Builder clearFields() {
+ this.eventFields.clear();
+ return this;
+ }
+
+ public Event build() {
+ return new StandardEvent(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java
new file mode 100644
index 0000000..21266bb
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/event/StandardEventField.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.event;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.hook.EventField;
+import org.apache.nifi.registry.hook.EventFieldName;
+
+/**
+ * Standard implementation of EventField.
+ */
+public class StandardEventField implements EventField {
+
+ private final EventFieldName name;
+
+ private final String value;
+
+ public StandardEventField(final EventFieldName name, final String value) {
+ this.name = name;
+ this.value = value;
+ Validate.notNull(this.name);
+ Validate.notNull(this.value);
+ }
+
+ @Override
+ public EventFieldName getName() {
+ return name;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java
new file mode 100644
index 0000000..8f9180c
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/AdministrationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.exception;
+
+/**
+ *
+ */
+public class AdministrationException extends RuntimeException {
+
+ public AdministrationException(Throwable cause) {
+ super(cause);
+ }
+
+ public AdministrationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AdministrationException(String message) {
+ super(message);
+ }
+
+ public AdministrationException() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
new file mode 100644
index 0000000..a83e9e2
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/exception/ResourceNotFoundException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.exception;
+
+/**
+ * An exception that is thrown when an entity is not found.
+ */
+public class ResourceNotFoundException extends RuntimeException {
+
+ public ResourceNotFoundException(String message) {
+ super(message);
+ }
+
+ public ResourceNotFoundException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java
new file mode 100644
index 0000000..b24f950
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionCloseable.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class ExtensionCloseable implements Closeable {
+ private final ClassLoader toSet;
+
+ private ExtensionCloseable(ClassLoader toSet) {
+ this.toSet = toSet;
+ }
+
+ public static ExtensionCloseable withComponentClassLoader(final ExtensionManager manager, final Class componentClass) {
+
+ final ClassLoader current = Thread.currentThread().getContextClassLoader();
+ final ExtensionCloseable closeable = new ExtensionCloseable(current);
+
+ ClassLoader componentClassLoader = manager.getExtensionClassLoader(componentClass.getName());
+ if (componentClassLoader == null) {
+ componentClassLoader = componentClass.getClassLoader();
+ }
+
+ Thread.currentThread().setContextClassLoader(componentClassLoader);
+ return closeable;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (toSet != null) {
+ Thread.currentThread().setContextClassLoader(toSet);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java
new file mode 100644
index 0000000..ca3259d
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/extension/ExtensionManager.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.extension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.hook.EventHookProvider;
+import org.apache.nifi.registry.security.authentication.IdentityProvider;
+import org.apache.nifi.registry.security.authorization.AccessPolicyProvider;
+import org.apache.nifi.registry.security.authorization.Authorizer;
+import org.apache.nifi.registry.security.authorization.UserGroupProvider;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Component
+public class ExtensionManager {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(ExtensionManager.class);
+
+ private static final List<Class> EXTENSION_CLASSES;
+ static {
+ final List<Class> classes = new ArrayList<>();
+ classes.add(FlowPersistenceProvider.class);
+ classes.add(UserGroupProvider.class);
+ classes.add(AccessPolicyProvider.class);
+ classes.add(Authorizer.class);
+ classes.add(IdentityProvider.class);
+ classes.add(EventHookProvider.class);
+ EXTENSION_CLASSES = Collections.unmodifiableList(classes);
+ }
+
+ private final NiFiRegistryProperties properties;
+ private final Map<String,ExtensionClassLoader> classLoaderMap = new HashMap<>();
+ private final AtomicBoolean loaded = new AtomicBoolean(false);
+
+ @Autowired
+ public ExtensionManager(final NiFiRegistryProperties properties) {
+ this.properties = properties;
+ }
+
+ @PostConstruct
+ public synchronized void discoverExtensions() {
+ if (!loaded.get()) {
+ // get the list of class loaders to consider
+ final List<ExtensionClassLoader> classLoaders = getClassLoaders();
+
+ // for each class loader, attempt to load each extension class using the ServiceLoader
+ for (final ExtensionClassLoader extensionClassLoader : classLoaders) {
+ for (final Class extensionClass : EXTENSION_CLASSES) {
+ loadExtensions(extensionClass, extensionClassLoader);
+ }
+ }
+
+ loaded.set(true);
+ }
+ }
+
+ public ClassLoader getExtensionClassLoader(final String canonicalClassName) {
+ if (StringUtils.isBlank(canonicalClassName)) {
+ throw new IllegalArgumentException("Class name can not be null");
+ }
+
+ return classLoaderMap.get(canonicalClassName);
+ }
+
+ /**
+ * Loads implementations of the given extension class from the given class loader.
+ *
+ * @param extensionClass the extension/service class
+ * @param extensionClassLoader the class loader to search
+ */
+ private void loadExtensions(final Class extensionClass, final ExtensionClassLoader extensionClassLoader) {
+ final ServiceLoader<?> serviceLoader = ServiceLoader.load(extensionClass, extensionClassLoader);
+ for (final Object o : serviceLoader) {
+ final String extensionClassName = o.getClass().getCanonicalName();
+ if (classLoaderMap.containsKey(extensionClassName)) {
+ final String currDir = extensionClassLoader.getRootDir();
+ final String existingDir = classLoaderMap.get(extensionClassName).getRootDir();
+ LOGGER.warn("Skipping {} from {} which was already found in {}", new Object[]{extensionClassName, currDir, existingDir});
+ } else {
+ classLoaderMap.put(o.getClass().getCanonicalName(), extensionClassLoader);
+ }
+ }
+ }
+
+ /**
+ * Gets all of the class loaders to consider for loading extensions.
+ *
+ * Includes the class loader of the web-app running the framework, plus a class loader for each additional
+ * directory specified in nifi-registry.properties.
+ *
+ * @return a list of extension class loaders
+ */
+ private List<ExtensionClassLoader> getClassLoaders() {
+ final List<ExtensionClassLoader> classLoaders = new ArrayList<>();
+
+ // start with the class loader that loaded ExtensionManager, should be WebAppClassLoader for API WAR
+ final ExtensionClassLoader frameworkClassLoader = new ExtensionClassLoader("web-api", new URL[0], this.getClass().getClassLoader());
+ classLoaders.add(frameworkClassLoader);
+
+ // we want to use the system class loader as the parent of the extension class loaders
+ ClassLoader systemClassLoader = FlowPersistenceProvider.class.getClassLoader();
+
+ // add a class loader for each extension dir
+ final Set<String> extensionDirs = properties.getExtensionsDirs();
+ for (final String dir : extensionDirs) {
+ if (!StringUtils.isBlank(dir)) {
+ final ExtensionClassLoader classLoader = createClassLoader(dir, systemClassLoader);
+ if (classLoader != null) {
+ classLoaders.add(classLoader);
+ }
+ }
+ }
+
+ return classLoaders;
+ }
+
+ /**
+ * Creates a class loader for the given directory of resources.
+ *
+ * @param dir the dir of resources to add to the class loader
+ * @param parentClassLoader the parent class loader
+ * @return a class loader including all of the resources in the given dir, with the specified parent class loader
+ */
+ private ExtensionClassLoader createClassLoader(final String dir, final ClassLoader parentClassLoader) {
+ final File dirFile = new File(dir);
+
+ if (!dirFile.exists()) {
+ LOGGER.warn("Skipping extension directory that does not exist: " + dir);
+ return null;
+ }
+
+ if (!dirFile.canRead()) {
+ LOGGER.warn("Skipping extension directory that can not be read: " + dir);
+ return null;
+ }
+
+ final List<URL> resources = new LinkedList<>();
+
+ try {
+ resources.add(dirFile.toURI().toURL());
+ } catch (final MalformedURLException mfe) {
+ LOGGER.warn("Unable to add {} to classpath due to {}",
+ new Object[]{ dirFile.getAbsolutePath(), mfe.getMessage()}, mfe);
+ }
+
+ if (dirFile.isDirectory()) {
+ final File[] files = dirFile.listFiles();
+ if (files != null) {
+ for (final File resource : files) {
+ if (resource.isDirectory()) {
+ LOGGER.warn("Recursive directories are not supported, skipping " + resource.getAbsolutePath());
+ } else {
+ try {
+ resources.add(resource.toURI().toURL());
+ } catch (final MalformedURLException mfe) {
+ LOGGER.warn("Unable to add {} to classpath due to {}",
+ new Object[]{ resource.getAbsolutePath(), mfe.getMessage()}, mfe);
+ }
+ }
+ }
+ }
+ }
+
+ final URL[] urls = resources.toArray(new URL[resources.size()]);
+ return new ExtensionClassLoader(dir, urls, parentClassLoader);
+ }
+
+ /**
+ * Extend URLClassLoader to keep track of the root directory.
+ */
+ private static class ExtensionClassLoader extends URLClassLoader {
+
+ private final String rootDir;
+
+ public ExtensionClassLoader(final String rootDir, final URL[] urls, final ClassLoader parent) {
+ super(urls, parent);
+ this.rootDir = rootDir;
+ }
+
+ public String getRootDir() {
+ return rootDir;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java
new file mode 100644
index 0000000..a3f3276
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.util.List;
+
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.hook.EventHookProvider;
+
+/**
+ * A factory for obtaining the configured providers.
+ */
+public interface ProviderFactory {
+
+ /**
+ * Initialize the factory.
+ *
+ * @throws ProviderFactoryException if an error occurs during initialization
+ */
+ void initialize() throws ProviderFactoryException;
+
+ /**
+ * @return the configured FlowPersistenceProvider
+ */
+ FlowPersistenceProvider getFlowPersistenceProvider();
+
+ /**
+ * @return the configured FlowHookProviders
+ */
+ List<EventHookProvider> getEventHookProviders();
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java
new file mode 100644
index 0000000..3842b9e
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/ProviderFactoryException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+/**
+ * An error that occurs while initializing a ProviderFactory.
+ */
+public class ProviderFactoryException extends RuntimeException {
+
+ public ProviderFactoryException() {
+ }
+
+ public ProviderFactoryException(String message) {
+ super(message);
+ }
+
+ public ProviderFactoryException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ProviderFactoryException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
new file mode 100644
index 0000000..8f186fd
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderConfigurationContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Standard configuration context to be passed to onConfigured method of Providers.
+ */
+public class StandardProviderConfigurationContext implements ProviderConfigurationContext {
+
+ private final Map<String,String> properties;
+
+ public StandardProviderConfigurationContext(final Map<String, String> properties) {
+ this.properties = Collections.unmodifiableMap(new HashMap<>(properties));
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return properties;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
new file mode 100644
index 0000000..65ba914
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/StandardProviderFactory.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider;
+
+import org.apache.nifi.registry.extension.ExtensionManager;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.hook.EventHookProvider;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.apache.nifi.registry.provider.generated.Property;
+import org.apache.nifi.registry.provider.generated.Providers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.xml.sax.SAXException;
+
+import javax.annotation.PostConstruct;
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Standard implementation of ProviderFactory.
+ */
+@Configuration
+public class StandardProviderFactory implements ProviderFactory {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(StandardProviderFactory.class);
+
+ private static final String PROVIDERS_XSD = "/providers.xsd";
+ private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.provider.generated";
+ private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+ /**
+ * Load the JAXBContext.
+ */
+ private static JAXBContext initializeJaxbContext() {
+ try {
+ return JAXBContext.newInstance(JAXB_GENERATED_PATH, StandardProviderFactory.class.getClassLoader());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.", e);
+ }
+ }
+
+ private final NiFiRegistryProperties properties;
+ private final ExtensionManager extensionManager;
+ private final AtomicReference<Providers> providersHolder = new AtomicReference<>(null);
+
+ private FlowPersistenceProvider flowPersistenceProvider;
+ private List<EventHookProvider> eventHookProviders;
+
+ @Autowired
+ public StandardProviderFactory(final NiFiRegistryProperties properties, final ExtensionManager extensionManager) {
+ this.properties = properties;
+ this.extensionManager = extensionManager;
+
+ if (this.properties == null) {
+ throw new IllegalStateException("NiFiRegistryProperties cannot be null");
+ }
+
+ if (this.extensionManager == null) {
+ throw new IllegalStateException("ExtensionManager cannot be null");
+ }
+ }
+
+ @PostConstruct
+ @Override
+ public synchronized void initialize() throws ProviderFactoryException {
+ if (providersHolder.get() == null) {
+ final File providersConfigFile = properties.getProvidersConfigurationFile();
+ if (providersConfigFile.exists()) {
+ try {
+ // find the schema
+ final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ final Schema schema = schemaFactory.newSchema(StandardProviderFactory.class.getResource(PROVIDERS_XSD));
+
+ // attempt to unmarshal
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ unmarshaller.setSchema(schema);
+
+ // set the holder for later use
+ final JAXBElement<Providers> element = unmarshaller.unmarshal(new StreamSource(providersConfigFile), Providers.class);
+ providersHolder.set(element.getValue());
+ } catch (SAXException | JAXBException e) {
+ throw new ProviderFactoryException("Unable to load the providers configuration file at: " + providersConfigFile.getAbsolutePath(), e);
+ }
+ } else {
+ throw new ProviderFactoryException("Unable to find the providers configuration file at " + providersConfigFile.getAbsolutePath());
+ }
+ }
+ }
+
+ @Bean
+ @Override
+ public synchronized FlowPersistenceProvider getFlowPersistenceProvider() {
+ if (flowPersistenceProvider == null) {
+ if (providersHolder.get() == null) {
+ throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
+ }
+
+ final Providers providers = providersHolder.get();
+ final org.apache.nifi.registry.provider.generated.Provider jaxbFlowProvider = providers.getFlowPersistenceProvider();
+ final String flowProviderClassName = jaxbFlowProvider.getClazz();
+
+ try {
+ final ClassLoader classLoader = extensionManager.getExtensionClassLoader(flowProviderClassName);
+ if (classLoader == null) {
+ throw new IllegalStateException("Extension not found in any of the configured class loaders: " + flowProviderClassName);
+ }
+
+ final Class<?> rawFlowProviderClass = Class.forName(flowProviderClassName, true, classLoader);
+ final Class<? extends FlowPersistenceProvider> flowProviderClass = rawFlowProviderClass.asSubclass(FlowPersistenceProvider.class);
+
+ final Constructor constructor = flowProviderClass.getConstructor();
+ flowPersistenceProvider = (FlowPersistenceProvider) constructor.newInstance();
+
+ LOGGER.info("Instantiated FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+ } catch (Exception e) {
+ throw new ProviderFactoryException("Error creating FlowPersistenceProvider with class name: " + flowProviderClassName, e);
+ }
+
+ final ProviderConfigurationContext configurationContext = createConfigurationContext(jaxbFlowProvider.getProperty());
+ flowPersistenceProvider.onConfigured(configurationContext);
+ LOGGER.info("Configured FlowPersistenceProvider with class name {}", new Object[] {flowProviderClassName});
+ }
+
+ return flowPersistenceProvider;
+ }
+
+ @Bean
+ @Override
+ public List<EventHookProvider> getEventHookProviders() {
+ if (eventHookProviders == null) {
+ eventHookProviders = new ArrayList<>();
+
+ if (providersHolder.get() == null) {
+ throw new ProviderFactoryException("ProviderFactory must be initialized before obtaining a Provider");
+ }
+
+ final Providers providers = providersHolder.get();
+ final List<org.apache.nifi.registry.provider.generated.Provider> jaxbHookProvider = providers.getEventHookProvider();
+
+ if(jaxbHookProvider == null || jaxbHookProvider.isEmpty()) {
+ // no hook provided
+ return eventHookProviders;
+ }
+
+ for (org.apache.nifi.registry.provider.generated.Provider hookProvider : jaxbHookProvider) {
+
+ final String hookProviderClassName = hookProvider.getClazz();
+ EventHookProvider hook;
+
+ try {
+ final ClassLoader classLoader = extensionManager.getExtensionClassLoader(hookProviderClassName);
+ if (classLoader == null) {
+ throw new IllegalStateException("Extension not found in any of the configured class loaders: " + hookProviderClassName);
+ }
+
+ final Class<?> rawHookProviderClass = Class.forName(hookProviderClassName, true, classLoader);
+ final Class<? extends EventHookProvider> hookProviderClass = rawHookProviderClass.asSubclass(EventHookProvider.class);
+
+ final Constructor constructor = hookProviderClass.getConstructor();
+ hook = (EventHookProvider) constructor.newInstance();
+
+ LOGGER.info("Instantiated EventHookProvider with class name {}", new Object[] {hookProviderClassName});
+ } catch (Exception e) {
+ throw new ProviderFactoryException("Error creating EventHookProvider with class name: " + hookProviderClassName, e);
+ }
+
+ final ProviderConfigurationContext configurationContext = createConfigurationContext(hookProvider.getProperty());
+ hook.onConfigured(configurationContext);
+ eventHookProviders.add(hook);
+ LOGGER.info("Configured EventHookProvider with class name {}", new Object[] {hookProviderClassName});
+ }
+ }
+
+ return eventHookProviders;
+ }
+
+ private ProviderConfigurationContext createConfigurationContext(final List<Property> configProperties) {
+ final Map<String,String> properties = new HashMap<>();
+
+ if (configProperties != null) {
+ configProperties.stream().forEach(p -> properties.put(p.getName(), p.getValue()));
+ }
+
+ return new StandardProviderConfigurationContext(properties);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
new file mode 100644
index 0000000..071656d
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/FileSystemFlowPersistenceProvider.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * A FlowPersistenceProvider that uses the local filesystem for storage.
+ */
+public class FileSystemFlowPersistenceProvider implements FlowPersistenceProvider {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(FileSystemFlowPersistenceProvider.class);
+
+ static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
+
+ static final String SNAPSHOT_EXTENSION = ".snapshot";
+
+ private File flowStorageDir;
+
+ @Override
+ public void onConfigured(final ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ final Map<String,String> props = configurationContext.getProperties();
+ if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) {
+ throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided");
+ }
+
+ final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP);
+ if (StringUtils.isBlank(flowStorageDirValue)) {
+ throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank");
+ }
+
+ try {
+ flowStorageDir = new File(flowStorageDirValue);
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowStorageDir);
+ LOGGER.info("Configured FileSystemFlowPersistenceProvider with Flow Storage Directory {}", new Object[] {flowStorageDir.getAbsolutePath()});
+ } catch (IOException e) {
+ throw new ProviderCreationException(e);
+ }
+ }
+
+ @Override
+ public synchronized void saveFlowContent(final FlowSnapshotContext context, final byte[] content) throws FlowPersistenceException {
+ final File bucketDir = new File(flowStorageDir, context.getBucketId());
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(bucketDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing bucket directory at " + bucketDir.getAbsolutePath(), e);
+ }
+
+ final File flowDir = new File(bucketDir, context.getFlowId());
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(flowDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing flow directory at " + flowDir.getAbsolutePath(), e);
+ }
+
+ final String versionString = String.valueOf(context.getVersion());
+ final File versionDir = new File(flowDir, versionString);
+ try {
+ FileUtils.ensureDirectoryExistAndCanReadAndWrite(versionDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error accessing version directory at " + versionDir.getAbsolutePath(), e);
+ }
+
+ final File versionFile = new File(versionDir, versionString + SNAPSHOT_EXTENSION);
+ if (versionFile.exists()) {
+ throw new FlowPersistenceException("Unable to save, a snapshot already exists with version " + versionString);
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Saving snapshot with filename {}", new Object[] {versionFile.getAbsolutePath()});
+ }
+
+ try (final OutputStream out = new FileOutputStream(versionFile)) {
+ out.write(content);
+ out.flush();
+ } catch (Exception e) {
+ throw new FlowPersistenceException("Unable to write snapshot to disk due to " + e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public synchronized byte[] getFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+ final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Retrieving snapshot with filename {}", new Object[] {snapshotFile.getAbsolutePath()});
+ }
+
+ if (!snapshotFile.exists()) {
+ return null;
+ }
+
+ try (final InputStream in = new FileInputStream(snapshotFile)){
+ return IOUtils.toByteArray(in);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error reading snapshot file: " + snapshotFile.getAbsolutePath(), e);
+ }
+ }
+
+ @Override
+ public synchronized void deleteAllFlowContent(final String bucketId, final String flowId) throws FlowPersistenceException {
+ final File flowDir = new File(flowStorageDir, bucketId + "/" + flowId);
+ if (!flowDir.exists()) {
+ LOGGER.debug("Snapshot directory does not exist at {}", new Object[] {flowDir.getAbsolutePath()});
+ return;
+ }
+
+ // delete everything under the flow directory
+ try {
+ org.apache.commons.io.FileUtils.cleanDirectory(flowDir);
+ } catch (IOException e) {
+ throw new FlowPersistenceException("Error deleting snapshots at " + flowDir.getAbsolutePath(), e);
+ }
+
+ // delete the directory for the flow
+ final boolean flowDirDeleted = flowDir.delete();
+ if (!flowDirDeleted) {
+ LOGGER.error("Unable to delete flow directory: " + flowDir.getAbsolutePath());
+ }
+
+ // delete the directory for the bucket if there is nothing left
+ final File bucketDir = new File(flowStorageDir, bucketId);
+ final File[] bucketFiles = bucketDir.listFiles();
+ if (bucketFiles.length == 0) {
+ final boolean deletedBucket = bucketDir.delete();
+ if (!deletedBucket) {
+ LOGGER.error("Unable to delete bucket directory: " + flowDir.getAbsolutePath());
+ }
+ }
+ }
+
+ @Override
+ public synchronized void deleteFlowContent(final String bucketId, final String flowId, final int version) throws FlowPersistenceException {
+ final File snapshotFile = getSnapshotFile(bucketId, flowId, version);
+ if (!snapshotFile.exists()) {
+ LOGGER.debug("Snapshot file does not exist at {}", new Object[] {snapshotFile.getAbsolutePath()});
+ return;
+ }
+
+ final boolean deleted = snapshotFile.delete();
+ if (!deleted) {
+ throw new FlowPersistenceException("Unable to delete snapshot at " + snapshotFile.getAbsolutePath());
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Deleted snapshot at {}", new Object[] {snapshotFile.getAbsolutePath()});
+ }
+ }
+
+ protected File getSnapshotFile(final String bucketId, final String flowId, final int version) {
+ final String snapshotFilename = bucketId + "/" + flowId + "/" + version + "/" + version + SNAPSHOT_EXTENSION;
+ return new File(flowStorageDir, snapshotFilename);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
new file mode 100644
index 0000000..1728513
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/StandardFlowSnapshotContext.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+
+/**
+ * Standard implementation of FlowSnapshotContext.
+ */
+public class StandardFlowSnapshotContext implements FlowSnapshotContext {
+
+ private final String bucketId;
+ private final String bucketName;
+ private final String flowId;
+ private final String flowName;
+ private final int version;
+ private final String comments;
+ private final String author;
+ private final long snapshotTimestamp;
+
+ private StandardFlowSnapshotContext(final Builder builder) {
+ this.bucketId = builder.bucketId;
+ this.bucketName = builder.bucketName;
+ this.flowId = builder.flowId;
+ this.flowName = builder.flowName;
+ this.version = builder.version;
+ this.comments = builder.comments;
+ this.author = builder.author;
+ this.snapshotTimestamp = builder.snapshotTimestamp;
+
+ Validate.notBlank(bucketId);
+ Validate.notBlank(bucketName);
+ Validate.notBlank(flowId);
+ Validate.notBlank(flowName);
+ Validate.isTrue(version > 0);
+ Validate.isTrue(snapshotTimestamp > 0);
+ }
+
+ @Override
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ @Override
+ public String getBucketName() {
+ return bucketName;
+ }
+
+ @Override
+ public String getFlowId() {
+ return flowId;
+ }
+
+ @Override
+ public String getFlowName() {
+ return flowName;
+ }
+
+ @Override
+ public int getVersion() {
+ return version;
+ }
+
+ @Override
+ public String getComments() {
+ return comments;
+ }
+
+ @Override
+ public long getSnapshotTimestamp() {
+ return snapshotTimestamp;
+ }
+
+ @Override
+ public String getAuthor() {
+ return author;
+ }
+
+ /**
+ * Builder for creating instances of StandardFlowSnapshotContext.
+ */
+ public static class Builder {
+
+ private String bucketId;
+ private String bucketName;
+ private String flowId;
+ private String flowName;
+ private int version;
+ private String comments;
+ private String author;
+ private long snapshotTimestamp;
+
+ public Builder() {
+
+ }
+
+ public Builder(final Bucket bucket, final VersionedFlow versionedFlow, final VersionedFlowSnapshotMetadata snapshotMetadata) {
+ bucketId(bucket.getIdentifier());
+ bucketName(bucket.getName());
+ flowId(snapshotMetadata.getFlowIdentifier());
+ flowName(versionedFlow.getName());
+ version(snapshotMetadata.getVersion());
+ comments(snapshotMetadata.getComments());
+ author(snapshotMetadata.getAuthor());
+ snapshotTimestamp(snapshotMetadata.getTimestamp());
+ }
+
+ public Builder bucketId(final String bucketId) {
+ this.bucketId = bucketId;
+ return this;
+ }
+
+ public Builder bucketName(final String bucketName) {
+ this.bucketName = bucketName;
+ return this;
+ }
+
+ public Builder flowId(final String flowId) {
+ this.flowId = flowId;
+ return this;
+ }
+
+ public Builder flowName(final String flowName) {
+ this.flowName = flowName;
+ return this;
+ }
+
+ public Builder version(final int version) {
+ this.version = version;
+ return this;
+ }
+
+ public Builder comments(final String comments) {
+ this.comments = comments;
+ return this;
+ }
+
+ public Builder author(final String author) {
+ this.author = author;
+ return this;
+ }
+
+ public Builder snapshotTimestamp(final long snapshotTimestamp) {
+ this.snapshotTimestamp = snapshotTimestamp;
+ return this;
+ }
+
+ public StandardFlowSnapshotContext build() {
+ return new StandardFlowSnapshotContext(this);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
new file mode 100644
index 0000000..3595d84
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Bucket.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+class Bucket {
+ private final String bucketId;
+ private String bucketDirName;
+
+ /**
+ * Flow ID to Flow.
+ */
+ private Map<String, Flow> flows = new HashMap<>();
+
+ public Bucket(String bucketId) {
+ this.bucketId = bucketId;
+ }
+
+ public String getBucketId() {
+ return bucketId;
+ }
+
+ /**
+ * Returns the directory name of this bucket.
+ * @return can be different from original bucket name if it contained sanitized character.
+ */
+ public String getBucketDirName() {
+ return bucketDirName;
+ }
+
+ /**
+ * Set the name of bucket directory.
+ * @param bucketDirName The directory name must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so.
+ */
+ public void setBucketDirName(String bucketDirName) {
+ this.bucketDirName = bucketDirName;
+ }
+
+ public Flow getFlowOrCreate(String flowId) {
+ return this.flows.computeIfAbsent(flowId, k -> new Flow(flowId));
+ }
+
+ public Optional<Flow> getFlow(String flowId) {
+ return Optional.ofNullable(flows.get(flowId));
+ }
+
+ public void removeFlow(String flowId) {
+ flows.remove(flowId);
+ }
+
+ public boolean isEmpty() {
+ return flows.isEmpty();
+ }
+
+ /**
+ * Serialize the latest version of this Bucket meta data.
+ * @return serialized bucket
+ */
+ Map<String, Object> serialize() {
+ final Map<String, Object> map = new HashMap<>();
+
+ map.put(GitFlowMetaData.LAYOUT_VERSION, GitFlowMetaData.CURRENT_LAYOUT_VERSION);
+ map.put(GitFlowMetaData.BUCKET_ID, bucketId);
+ map.put(GitFlowMetaData.FLOWS,
+ flows.keySet().stream().collect(Collectors.toMap(k -> k, k -> flows.get(k).serialize())));
+
+ return map;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
new file mode 100644
index 0000000..1bc7f3f
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/Flow.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+class Flow {
+ /**
+ * The ID of a Flow. It never changes.
+ */
+ private final String flowId;
+
+ /**
+ * A version to a Flow pointer.
+ */
+ private final Map<Integer, FlowPointer> versions = new HashMap<>();
+
+ public Flow(String flowId) {
+ this.flowId = flowId;
+ }
+
+ public boolean hasVersion(int version) {
+ return versions.containsKey(version);
+ }
+
+ public FlowPointer getFlowVersion(int version) {
+ return versions.get(version);
+ }
+
+ public void putVersion(int version, FlowPointer pointer) {
+ versions.put(version, pointer);
+ }
+
+ public static class FlowPointer {
+ private String gitRev;
+ private String objectId;
+ private final String fileName;
+
+ /**
+ * Create new FlowPointer instance.
+ * @param fileName The filename must be sanitized, use {@link org.apache.nifi.registry.util.FileUtils#sanitizeFilename(String)} to do so.
+ */
+ public FlowPointer(String fileName) {
+ this.fileName = fileName;
+ }
+
+ public void setGitRev(String gitRev) {
+ this.gitRev = gitRev;
+ }
+
+ public String getGitRev() {
+ return gitRev;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public String getObjectId() {
+ return objectId;
+ }
+
+ public void setObjectId(String objectId) {
+ this.objectId = objectId;
+ }
+ }
+
+ /**
+ * Serialize the latest version of this Flow meta data.
+ * @return serialized flow
+ */
+ Map<String, Object> serialize() {
+ final Map<String, Object> map = new HashMap<>();
+ final Optional<Integer> latestVerOpt = getLatestVersion();
+ if (!latestVerOpt.isPresent()) {
+ throw new IllegalStateException("Flow version is not added yet, can not be serialized.");
+ }
+ final Integer latestVer = latestVerOpt.get();
+ map.put(GitFlowMetaData.VER, latestVer);
+ map.put(GitFlowMetaData.FILE, versions.get(latestVer).fileName);
+
+ return map;
+ }
+
+ Optional<Integer> getLatestVersion() {
+ return versions.keySet().stream().reduce(Integer::max);
+ }
+
+}