You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2018/09/22 02:11:37 UTC

[38/51] [partial] nifi-registry git commit: NIFIREG-201 Refactoring project structure to better isolate extensions

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
new file mode 100644
index 0000000..4faf007
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.PushCommand;
+import org.eclipse.jgit.api.Status;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.api.errors.NoHeadException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.storage.file.FileRepositoryBuilder;
+import org.eclipse.jgit.transport.CredentialsProvider;
+import org.eclipse.jgit.transport.PushResult;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+class GitFlowMetaData {
+
+    static final int CURRENT_LAYOUT_VERSION = 1;
+
+    static final String LAYOUT_VERSION = "layoutVer";
+    static final String BUCKET_ID = "bucketId";
+    static final String FLOWS = "flows";
+    static final String VER = "ver";
+    static final String FILE = "file";
+    static final String BUCKET_FILENAME = "bucket.yml";
+
+    private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class);
+
+    private Repository gitRepo;
+    private String remoteToPush;
+    private CredentialsProvider credentialsProvider;
+
+    private final BlockingQueue<Long> pushQueue = new ArrayBlockingQueue<>(1);
+
+    /**
+     * Bucket ID to Bucket.
+     */
+    private Map<String, Bucket> buckets = new HashMap<>();
+
+    public void setRemoteToPush(String remoteToPush) {
+        this.remoteToPush = remoteToPush;
+    }
+
+    public void setRemoteCredential(String userName, String password) {
+        this.credentialsProvider = new UsernamePasswordCredentialsProvider(userName, password);
+    }
+
+    /**
+     * Open a Git repository using the specified directory.
+     * @param gitProjectRootDir a root directory of a Git project
+     * @return created Repository
+     * @throws IOException thrown when the specified directory does not exist,
+     * does not have read/write privilege or not containing .git directory
+     */
+    private Repository openRepository(final File gitProjectRootDir) throws IOException {
+
+        // Instead of using FileUtils.ensureDirectoryExistAndCanReadAndWrite, check availability manually here.
+        // Because the util will try to create a dir if not exist.
+        // The git dir should be initialized and configured by users.
+        if (!gitProjectRootDir.isDirectory()) {
+            throw new IOException(format("'%s' is not a directory or does not exist.", gitProjectRootDir));
+        }
+
+        if (!(gitProjectRootDir.canRead() && gitProjectRootDir.canWrite())) {
+            throw new IOException(format("Directory '%s' does not have read/write privilege.", gitProjectRootDir));
+        }
+
+        // Search .git dir but avoid searching parent directories.
+        final FileRepositoryBuilder builder = new FileRepositoryBuilder()
+                .readEnvironment()
+                .setMustExist(true)
+                .addCeilingDirectory(gitProjectRootDir)
+                .findGitDir(gitProjectRootDir);
+
+        if (builder.getGitDir() == null) {
+            throw new IOException(format("Directory '%s' does not contain a .git directory." +
+                    " Please init and configure the directory with 'git init' command before using it from NiFi Registry.",
+                    gitProjectRootDir));
+        }
+
+        return builder.build();
+    }
+
+    @SuppressWarnings("unchecked")
+    public void loadGitRepository(File gitProjectRootDir) throws IOException, GitAPIException {
+        gitRepo = openRepository(gitProjectRootDir);
+
+        try (final Git git = new Git(gitRepo)) {
+
+            // Check if remote exists.
+            if (!isEmpty(remoteToPush)) {
+                final List<RemoteConfig> remotes = git.remoteList().call();
+                final boolean isRemoteExist = remotes.stream().anyMatch(remote -> remote.getName().equals(remoteToPush));
+                if (!isRemoteExist) {
+                    final List<String> remoteNames = remotes.stream().map(RemoteConfig::getName).collect(Collectors.toList());
+                    throw new IllegalArgumentException(
+                            format("The configured remote '%s' to push does not exist. Available remotes are %s", remoteToPush, remoteNames));
+                }
+            }
+
+            boolean isLatestCommit = true;
+            try {
+                for (RevCommit commit : git.log().call()) {
+                    final String shortCommitId = commit.getId().abbreviate(7).name();
+                    logger.debug("Processing a commit: {}", shortCommitId);
+                    final RevTree tree = commit.getTree();
+
+                    try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) {
+                        treeWalk.addTree(tree);
+
+                        // Path -> ObjectId
+                        final Map<String, ObjectId> bucketObjectIds = new HashMap<>();
+                        final Map<String, ObjectId> flowSnapshotObjectIds = new HashMap<>();
+                        while (treeWalk.next()) {
+                            if (treeWalk.isSubtree()) {
+                                treeWalk.enterSubtree();
+                            } else {
+                                final String pathString = treeWalk.getPathString();
+                                // TODO: what is this nth?? When does it get grater than 0? Tree count seems to be always 1..
+                                if (pathString.endsWith("/" + BUCKET_FILENAME)) {
+                                    bucketObjectIds.put(pathString, treeWalk.getObjectId(0));
+                                } else if (pathString.endsWith(GitFlowPersistenceProvider.SNAPSHOT_EXTENSION)) {
+                                    flowSnapshotObjectIds.put(pathString, treeWalk.getObjectId(0));
+                                }
+                            }
+                        }
+
+                        if (bucketObjectIds.isEmpty()) {
+                            // No bucket.yml means at this point, all flows are deleted. No need to scan older commits because those are already deleted.
+                            logger.debug("Tree at commit {} does not contain any " + BUCKET_FILENAME + ". Stop loading commits here.", shortCommitId);
+                            return;
+                        }
+
+                        loadBuckets(gitRepo, commit, isLatestCommit, bucketObjectIds, flowSnapshotObjectIds);
+                        isLatestCommit = false;
+                    }
+                }
+            } catch (NoHeadException e) {
+                logger.debug("'{}' does not have any commit yet. Starting with empty buckets.", gitProjectRootDir);
+            }
+
+        }
+    }
+
+    void startPushThread() {
+        // If successfully loaded, start pushing thread if necessary.
+        if (isEmpty(remoteToPush)) {
+            return;
+        }
+
+        final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
+                .daemon(true).namingPattern(getClass().getSimpleName() + " Push thread").build();
+
+        // Use scheduled fixed delay to control the minimum interval between push activities.
+        // The necessity of executing push is controlled by offering messages to the pushQueue.
+        // If multiple commits are made within this time window, those are pushed by a single push execution.
+        final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+        executorService.scheduleWithFixedDelay(() -> {
+
+            final Long offeredTimestamp;
+            try {
+                offeredTimestamp = pushQueue.take();
+            } catch (InterruptedException e) {
+                logger.warn("Waiting for push request has been interrupted due to {}", e.getMessage(), e);
+                return;
+            }
+
+            logger.debug("Took a push request sent at {} to {}...", offeredTimestamp, remoteToPush);
+            final PushCommand pushCommand = new Git(gitRepo).push().setRemote(remoteToPush);
+            if (credentialsProvider != null) {
+                pushCommand.setCredentialsProvider(credentialsProvider);
+            }
+
+            try {
+                final Iterable<PushResult> pushResults = pushCommand.call();
+                for (PushResult pushResult : pushResults) {
+                    logger.debug(pushResult.getMessages());
+                }
+            } catch (GitAPIException e) {
+                logger.error(format("Failed to push commits to %s due to %s", remoteToPush, e), e);
+            }
+
+        }, 10, 10, TimeUnit.SECONDS);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void loadBuckets(Repository gitRepo, RevCommit commit, boolean isLatestCommit, Map<String, ObjectId> bucketObjectIds, Map<String, ObjectId> flowSnapshotObjectIds) throws IOException {
+        final Yaml yaml = new Yaml();
+        for (String bucketFilePath : bucketObjectIds.keySet()) {
+            final ObjectId bucketObjectId = bucketObjectIds.get(bucketFilePath);
+            final Map<String, Object> bucketMeta;
+            try (InputStream bucketIn = gitRepo.newObjectReader().open(bucketObjectId).openStream()) {
+                bucketMeta = yaml.load(bucketIn);
+            }
+
+            if (!validateRequiredValue(bucketMeta, bucketFilePath, LAYOUT_VERSION, BUCKET_ID, FLOWS)) {
+                continue;
+            }
+
+            int layoutVersion = (int) bucketMeta.get(LAYOUT_VERSION);
+            if (layoutVersion > CURRENT_LAYOUT_VERSION) {
+                logger.warn("{} has unsupported {} {}. This Registry can only support {} or lower. Skipping it.",
+                        bucketFilePath, LAYOUT_VERSION, layoutVersion, CURRENT_LAYOUT_VERSION);
+                continue;
+            }
+
+            final String bucketId = (String) bucketMeta.get(BUCKET_ID);
+
+            final Bucket bucket;
+            if (isLatestCommit) {
+                // If this is the latest commit, then create one.
+                bucket = getBucketOrCreate(bucketId);
+            } else {
+                // Otherwise non-existing bucket means it's already deleted.
+                final Optional<Bucket> bucketOpt = getBucket(bucketId);
+                if (bucketOpt.isPresent()) {
+                    bucket = bucketOpt.get();
+                } else {
+                    logger.debug("Bucket {} does not exist any longer. It may have been deleted.", bucketId);
+                    continue;
+                }
+            }
+
+            // Since the bucketName is restored from pathname, it can be different from the original bucket name when it sanitized.
+            final String bucketDirName = bucketFilePath.substring(0, bucketFilePath.lastIndexOf("/"));
+
+            // Since commits are read in LIFO order, avoid old commits overriding the latest bucket name.
+            if (isEmpty(bucket.getBucketDirName())) {
+                bucket.setBucketDirName(bucketDirName);
+            }
+
+            final Map<String, Object> flows = (Map<String, Object>) bucketMeta.get(FLOWS);
+            loadFlows(commit, isLatestCommit, bucket, bucketFilePath, flows, flowSnapshotObjectIds);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, String backetFilePath, Map<String, Object> flows, Map<String, ObjectId> flowSnapshotObjectIds) {
+        for (String flowId : flows.keySet()) {
+            final Map<String, Object> flowMeta = (Map<String, Object>) flows.get(flowId);
+
+            if (!validateRequiredValue(flowMeta, backetFilePath + ":" + flowId, VER, FILE)) {
+                continue;
+            }
+
+            final Flow flow;
+            if (isLatestCommit) {
+                // If this is the latest commit, then create one.
+                flow = bucket.getFlowOrCreate(flowId);
+            } else {
+                // Otherwise non-existing flow means it's already deleted.
+                final Optional<Flow> flowOpt = bucket.getFlow(flowId);
+                if (flowOpt.isPresent()) {
+                    flow = flowOpt.get();
+                } else {
+                    logger.debug("Flow {} does not exist in bucket {}:{} any longer. It may have been deleted.", flowId, bucket.getBucketDirName(), bucket.getBucketId());
+                    continue;
+                }
+            }
+
+            final int version = (int) flowMeta.get(VER);
+            final String flowSnapshotFilename = (String) flowMeta.get(FILE);
+
+            // Since commits are read in LIFO order, avoid old commits overriding the latest pointer.
+            if (!flow.hasVersion(version)) {
+                final Flow.FlowPointer pointer = new Flow.FlowPointer(flowSnapshotFilename);
+                final File flowSnapshotFile = new File(new File(backetFilePath).getParent(), flowSnapshotFilename);
+                final ObjectId objectId = flowSnapshotObjectIds.get(flowSnapshotFile.getPath());
+                if (objectId == null) {
+                    logger.warn("Git object id for Flow {} version {} with path {} in bucket {}:{} was not found. Ignoring this entry.",
+                            flowId, version, flowSnapshotFile.getPath(), bucket.getBucketDirName(), bucket.getBucketId());
+                    continue;
+                }
+                pointer.setGitRev(commit.getName());
+                pointer.setObjectId(objectId.getName());
+                flow.putVersion(version, pointer);
+            }
+        }
+    }
+
+    private boolean validateRequiredValue(final Map map, String nameOfMap, Object ... keys) {
+        for (Object key : keys) {
+            if (!map.containsKey(key)) {
+                logger.warn("{} does not have {}. Skipping it.", nameOfMap, key);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public Bucket getBucketOrCreate(String bucketId) {
+        return buckets.computeIfAbsent(bucketId, k -> new Bucket(bucketId));
+    }
+
+    public Optional<Bucket> getBucket(String bucketId) {
+        return Optional.ofNullable(buckets.get(bucketId));
+    }
+
+
+    void saveBucket(final Bucket bucket, final File bucketDir) throws IOException {
+        final Yaml yaml = new Yaml();
+        final Map<String, Object> serializedBucket = bucket.serialize();
+        final File bucketFile = new File(bucketDir, GitFlowMetaData.BUCKET_FILENAME);
+
+        try (final Writer writer = new OutputStreamWriter(
+                new FileOutputStream(bucketFile), StandardCharsets.UTF_8)) {
+            yaml.dump(serializedBucket, writer);
+        }
+    }
+
+    boolean isGitDirectoryClean() throws GitAPIException {
+        final Status status = new Git(gitRepo).status().call();
+        return status.isClean() && !status.hasUncommittedChanges();
+    }
+
+    /**
+     * Create a Git commit.
+     * @param author The name of a NiFi Registry user who created the snapshot. It will be added to the commit message.
+     * @param message Commit message.
+     * @param bucket A bucket to commit.
+     * @param flowPointer A flow pointer for the flow snapshot which is updated.
+     *                    After a commit is created, new commit rev id and flow snapshot file object id are set to this pointer.
+     *                    It can be null if none of flow content is modified.
+     */
+    void commit(String author, String message, Bucket bucket, Flow.FlowPointer flowPointer) throws GitAPIException, IOException {
+        try (final Git git = new Git(gitRepo)) {
+            // Execute add command for newly added files (if any).
+            git.add().addFilepattern(".").call();
+
+            // Execute add command again for deleted files (if any).
+            git.add().addFilepattern(".").setUpdate(true).call();
+
+            final String commitMessage = isEmpty(author) ? message
+                    : format("%s\n\nBy NiFi Registry user: %s", message, author);
+            final RevCommit commit = git.commit()
+                    .setMessage(commitMessage)
+                    .call();
+
+            if (flowPointer != null) {
+                final RevTree tree = commit.getTree();
+                final String bucketDirName = bucket.getBucketDirName();
+                final String flowSnapshotPath = new File(bucketDirName, flowPointer.getFileName()).getPath();
+                try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) {
+                    treeWalk.addTree(tree);
+
+                    while (treeWalk.next()) {
+                        if (treeWalk.isSubtree()) {
+                            treeWalk.enterSubtree();
+                        } else {
+                            final String pathString = treeWalk.getPathString();
+                            if (pathString.equals(flowSnapshotPath)) {
+                                // Capture updated object id.
+                                final String flowSnapshotObjectId = treeWalk.getObjectId(0).getName();
+                                flowPointer.setObjectId(flowSnapshotObjectId);
+                                break;
+                            }
+                        }
+                    }
+                }
+
+                flowPointer.setGitRev(commit.getName());
+            }
+
+            // Push if necessary.
+            if (!isEmpty(remoteToPush)) {
+                // Use different thread since it takes longer.
+                final long offeredTimestamp = System.currentTimeMillis();
+                if (pushQueue.offer(offeredTimestamp)) {
+                    logger.debug("New push request is offered at {}.", offeredTimestamp);
+                }
+            }
+
+        }
+    }
+
+    byte[] getContent(String objectId) throws IOException {
+        final ObjectId flowSnapshotObjectId = gitRepo.resolve(objectId);
+        return gitRepo.newObjectReader().open(flowSnapshotObjectId).getBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
new file mode 100644
index 0000000..f642632
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.registry.util.FileUtils.sanitizeFilename;
+
+public class GitFlowPersistenceProvider implements FlowPersistenceProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class);
+    static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
+    private static final String REMOTE_TO_PUSH = "Remote To Push";
+    private static final String REMOTE_ACCESS_USER = "Remote Access User";
+    private static final String REMOTE_ACCESS_PASSWORD = "Remote Access Password";
+    static final String SNAPSHOT_EXTENSION = ".snapshot";
+
+    private File flowStorageDir;
+    private GitFlowMetaData flowMetaData;
+
+    @Override
+    public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        flowMetaData = new GitFlowMetaData();
+
+        final Map<String,String> props = configurationContext.getProperties();
+        if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) {
+            throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided");
+        }
+
+        final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP);
+        if (isEmpty(flowStorageDirValue)) {
+            throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank");
+        }
+
+        flowMetaData.setRemoteToPush(props.get(REMOTE_TO_PUSH));
+
+        final String remoteUser = props.get(REMOTE_ACCESS_USER);
+        final String remotePassword = props.get(REMOTE_ACCESS_PASSWORD);
+        if (!isEmpty(remoteUser) && isEmpty(remotePassword)) {
+            throw new ProviderCreationException(format("The property %s is specified but %s is not." +
+                    " %s is required for username password authentication.",
+                    REMOTE_ACCESS_USER, REMOTE_ACCESS_PASSWORD, REMOTE_ACCESS_PASSWORD));
+        }
+        if (!isEmpty(remotePassword)) {
+            flowMetaData.setRemoteCredential(remoteUser, remotePassword);
+        }
+
+        try {
+            flowStorageDir = new File(flowStorageDirValue);
+            flowMetaData.loadGitRepository(flowStorageDir);
+            flowMetaData.startPushThread();
+            logger.info("Configured GitFlowPersistenceProvider with Flow Storage Directory {}",
+                    new Object[] {flowStorageDir.getAbsolutePath()});
+        } catch (IOException|GitAPIException e) {
+            throw new ProviderCreationException("Failed to load a git repository " + flowStorageDir, e);
+        }
+    }
+
+    @Override
+    public void saveFlowContent(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException {
+
+        try {
+            // Check if working dir is clean, any uncommitted file?
+            if (!flowMetaData.isGitDirectoryClean()) {
+                throw new FlowPersistenceException(format("Git directory %s is not clean" +
+                                " or has uncommitted changes, resolve those changes first to save flow contents.",
+                        flowStorageDir));
+            }
+        } catch (GitAPIException e) {
+            throw new FlowPersistenceException(format("Failed to get Git status for directory %s due to %s",
+                    flowStorageDir, e));
+        }
+
+        final String bucketId = context.getBucketId();
+        final Bucket bucket = flowMetaData.getBucketOrCreate(bucketId);
+        final String currentBucketDirName = bucket.getBucketDirName();
+        final String bucketDirName = sanitizeFilename(context.getBucketName());
+        final boolean isBucketNameChanged = !bucketDirName.equals(currentBucketDirName);
+        bucket.setBucketDirName(bucketDirName);
+
+        final Flow flow = bucket.getFlowOrCreate(context.getFlowId());
+        final String flowSnapshotFilename = sanitizeFilename(context.getFlowName()) + SNAPSHOT_EXTENSION;
+
+        final Optional<String> currentFlowSnapshotFilename = flow
+                .getLatestVersion().map(flow::getFlowVersion).map(Flow.FlowPointer::getFileName);
+
+        // Add new version.
+        final Flow.FlowPointer flowPointer = new Flow.FlowPointer(flowSnapshotFilename);
+        flow.putVersion(context.getVersion(), flowPointer);
+
+        final File bucketDir = new File(flowStorageDir, bucketDirName);
+        final File flowSnippetFile = new File(bucketDir, flowSnapshotFilename);
+
+        final File currentBucketDir = isEmpty(currentBucketDirName) ? null : new File(flowStorageDir, currentBucketDirName);
+        if (currentBucketDir != null && currentBucketDir.isDirectory()) {
+            if (isBucketNameChanged) {
+                logger.debug("Detected bucket name change from {} to {}, moving it.", currentBucketDirName, bucketDirName);
+                if (!currentBucketDir.renameTo(bucketDir)) {
+                    throw new FlowPersistenceException(format("Failed to move existing bucket %s to %s.", currentBucketDir, bucketDir));
+                }
+            }
+        } else {
+            if (!bucketDir.mkdirs()) {
+                throw new FlowPersistenceException(format("Failed to create new bucket dir %s.", bucketDir));
+            }
+        }
+
+
+        try {
+            if (currentFlowSnapshotFilename.isPresent() && !flowSnapshotFilename.equals(currentFlowSnapshotFilename.get())) {
+                // Delete old file if flow name has been changed.
+                final File latestFlowSnapshotFile = new File(bucketDir, currentFlowSnapshotFilename.get());
+                logger.debug("Detected flow name change from {} to {}, deleting the old snapshot file.",
+                        currentFlowSnapshotFilename.get(), flowSnapshotFilename);
+                latestFlowSnapshotFile.delete();
+            }
+
+            // Save the content.
+            try (final OutputStream os = new FileOutputStream(flowSnippetFile)) {
+                os.write(content);
+                os.flush();
+            }
+
+            // Write a bucket file.
+            flowMetaData.saveBucket(bucket, bucketDir);
+
+            // Create a Git Commit.
+            flowMetaData.commit(context.getAuthor(), context.getComments(), bucket, flowPointer);
+
+        } catch (IOException|GitAPIException e) {
+            throw new FlowPersistenceException("Failed to persist flow.", e);
+        }
+
+        // TODO: What if user rebased commits? Version number to Commit ID mapping will be broken.
+    }
+
+    @Override
+    public byte[] getFlowContent(String bucketId, String flowId, int version) throws FlowPersistenceException {
+
+        final Bucket bucket = getBucketOrFail(bucketId);
+        final Flow flow = getFlowOrFail(bucket, flowId);
+        if (!flow.hasVersion(version)) {
+            throw new FlowPersistenceException(format("Flow ID %s version %d was not found in bucket %s:%s.",
+                    flowId, version, bucket.getBucketDirName(), bucketId));
+        }
+
+        final Flow.FlowPointer flowPointer = flow.getFlowVersion(version);
+        try {
+            return flowMetaData.getContent(flowPointer.getObjectId());
+        } catch (IOException e) {
+            throw new FlowPersistenceException(format("Failed to get content of Flow ID %s version %d in bucket %s:%s due to %s.",
+                    flowId, version, bucket.getBucketDirName(), bucketId, e), e);
+        }
+    }
+
+    // TODO: Need to add userId argument?
+    @Override
+    public void deleteAllFlowContent(String bucketId, String flowId) throws FlowPersistenceException {
+        final Bucket bucket = getBucketOrFail(bucketId);
+        final Flow flow = getFlowOrFail(bucket, flowId);
+        final Optional<Integer> latestVersionOpt = flow.getLatestVersion();
+        if (!latestVersionOpt.isPresent()) {
+            throw new IllegalStateException("Flow version is not added yet, can not be deleted.");
+        }
+
+        final Integer latestVersion = latestVersionOpt.get();
+        final Flow.FlowPointer flowPointer = flow.getFlowVersion(latestVersion);
+
+        // Delete the flow snapshot.
+        final File bucketDir = new File(flowStorageDir, bucket.getBucketDirName());
+        final File flowSnapshotFile = new File(bucketDir, flowPointer.getFileName());
+        if (flowSnapshotFile.exists()) {
+            if (!flowSnapshotFile.delete()) {
+                throw new FlowPersistenceException(format("Failed to delete flow content for %s:%s in bucket %s:%s",
+                        flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId));
+            }
+        }
+
+        bucket.removeFlow(flowId);
+
+        try {
+
+            if (bucket.isEmpty()) {
+                // delete bucket dir if this is the last flow.
+                FileUtils.deleteFile(bucketDir, true);
+            } else {
+                // Write a bucket file.
+                flowMetaData.saveBucket(bucket, bucketDir);
+            }
+
+            // Create a Git Commit.
+            final String commitMessage = format("Deleted flow %s:%s in bucket %s:%s.",
+                    flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId);
+            flowMetaData.commit(null, commitMessage, bucket, null);
+
+        } catch (IOException|GitAPIException e) {
+            throw new FlowPersistenceException(format("Failed to delete flow %s:%s in bucket %s:%s due to %s",
+                    flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId, e), e);
+        }
+
+    }
+
+    private Bucket getBucketOrFail(String bucketId) throws FlowPersistenceException {
+        final Optional<Bucket> bucketOpt = flowMetaData.getBucket(bucketId);
+        if (!bucketOpt.isPresent()) {
+            throw new FlowPersistenceException(format("Bucket ID %s was not found.", bucketId));
+        }
+
+        return bucketOpt.get();
+    }
+
+    private Flow getFlowOrFail(Bucket bucket, String flowId) throws FlowPersistenceException {
+        final Optional<Flow> flowOpt = bucket.getFlow(flowId);
+        if (!flowOpt.isPresent()) {
+            throw new FlowPersistenceException(format("Flow ID %s was not found in bucket %s:%s.",
+                    flowId, bucket.getBucketDirName(), bucket.getBucketId()));
+        }
+
+        return flowOpt.get();
+    }
+
+    @Override
+    public void deleteFlowContent(String bucketId, String flowId, int version) throws FlowPersistenceException {
+        // TODO: Do nothing? This signature is not used. Actually there's nothing to do to the old versions as those exist in old commits even if this method is called.
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java
new file mode 100644
index 0000000..9ceb59f
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.hook;
+
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventField;
+import org.apache.nifi.registry.hook.EventHookException;
+import org.apache.nifi.registry.hook.EventHookProvider;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingEventHookProvider
+    implements EventHookProvider {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(LoggingEventHookProvider.class);
+
+    @Override
+    public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        // Nothing to do
+    }
+
+    @Override
+    public void handle(final Event event) throws EventHookException {
+
+        final StringBuilder builder = new StringBuilder()
+                .append(event.getEventType())
+                .append(" [");
+
+        int count = 0;
+        for (final EventField argument : event.getFields()) {
+            if (count > 0) {
+                builder.append(", ");
+            }
+            builder.append(argument.getName()).append("=").append(argument.getValue());
+            count++;
+        }
+
+        builder.append("] ");
+
+        LOGGER.info(builder.toString());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java
new file mode 100644
index 0000000..f96115e
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.hook;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventField;
+import org.apache.nifi.registry.hook.WhitelistFilteringEventHookProvider;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A EventHookProvider that is used to execute a script to handle the event.
+ */
+public class ScriptEventHookProvider
+        extends WhitelistFilteringEventHookProvider {
+
+    static final Logger LOGGER = LoggerFactory.getLogger(ScriptEventHookProvider.class);
+    static final String SCRIPT_PATH_PROP = "Script Path";
+    static final String SCRIPT_WORKDIR_PROP = "Working Directory";
+    private File scriptFile;
+    private File workDirFile;
+
+
+    @Override
+    public void handle(final Event event) {
+        List<String> command = new ArrayList<>();
+        command.add(scriptFile.getAbsolutePath());
+        command.add(event.getEventType().name());
+
+        for (EventField arg : event.getFields()) {
+            command.add(arg.getValue());
+        }
+
+        final String commandString = StringUtils.join(command, " ");
+        final ProcessBuilder builder = new ProcessBuilder(command);
+        builder.directory(workDirFile);
+        LOGGER.debug("Execution of " + commandString);
+
+        try {
+            builder.start();
+        } catch (IOException e) {
+            LOGGER.error("Execution of {0} failed with: {1}", new Object[] { commandString, e.getLocalizedMessage() }, e);
+        }
+    }
+
+    @Override
+    public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+        super.onConfigured(configurationContext);
+
+        final Map<String,String> props = configurationContext.getProperties();
+        if (!props.containsKey(SCRIPT_PATH_PROP)) {
+            throw new ProviderCreationException("The property " + SCRIPT_PATH_PROP + " must be provided");
+        }
+
+        final String scripPath = props.get(SCRIPT_PATH_PROP);
+        if (StringUtils.isBlank(scripPath)) {
+            throw new ProviderCreationException("The property " + SCRIPT_PATH_PROP + " cannot be null or blank");
+        }
+
+        if(props.containsKey(SCRIPT_WORKDIR_PROP) && !StringUtils.isBlank(props.get(SCRIPT_WORKDIR_PROP))) {
+            final String workdir = props.get(SCRIPT_WORKDIR_PROP);
+            try {
+                workDirFile = new File(workdir);
+                FileUtils.ensureDirectoryExistAndCanRead(workDirFile);
+            } catch (IOException e) {
+                throw new ProviderCreationException("The working directory " + workdir + " cannot be read.");
+            }
+        }
+
+        scriptFile = new File(scripPath);
+        if(scriptFile.isFile() && scriptFile.canExecute()) {
+            LOGGER.info("Configured ScriptEventHookProvider with script {}", new Object[] {scriptFile.getAbsolutePath()});
+        } else {
+            throw new ProviderCreationException("The script file " + scriptFile.getAbsolutePath() + " cannot be executed.");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/IdentityProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/IdentityProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/IdentityProviderFactory.java
new file mode 100644
index 0000000..3c2a3f4
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/IdentityProviderFactory.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.security.authentication;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.extension.ExtensionManager;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.apache.nifi.registry.properties.SensitivePropertyProtectionException;
+import org.apache.nifi.registry.properties.SensitivePropertyProvider;
+import org.apache.nifi.registry.security.authentication.annotation.IdentityProviderContext;
+import org.apache.nifi.registry.security.authentication.generated.IdentityProviders;
+import org.apache.nifi.registry.security.authentication.generated.Property;
+import org.apache.nifi.registry.security.authentication.generated.Provider;
+import org.apache.nifi.registry.security.util.XmlUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.lang.Nullable;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class IdentityProviderFactory implements IdentityProviderLookup, DisposableBean {
+
+    private static final Logger logger = LoggerFactory.getLogger(IdentityProviderFactory.class);
+    private static final String LOGIN_IDENTITY_PROVIDERS_XSD = "/identity-providers.xsd";
+    private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.security.authentication.generated";
+    private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+    private static JAXBContext initializeJaxbContext() {
+        try {
+            return JAXBContext.newInstance(JAXB_GENERATED_PATH, IdentityProviderFactory.class.getClassLoader());
+        } catch (JAXBException e) {
+            throw new RuntimeException("Unable to create JAXBContext.");
+        }
+    }
+
+    private NiFiRegistryProperties properties;
+    private ExtensionManager extensionManager;
+    private SensitivePropertyProvider sensitivePropertyProvider;
+    private IdentityProvider identityProvider;
+    private final Map<String, IdentityProvider> identityProviders = new HashMap<>();
+
+    @Autowired
+    public IdentityProviderFactory(
+            final NiFiRegistryProperties properties,
+            final ExtensionManager extensionManager,
+            @Nullable final SensitivePropertyProvider sensitivePropertyProvider) {
+        this.properties = properties;
+        this.extensionManager = extensionManager;
+        this.sensitivePropertyProvider = sensitivePropertyProvider;
+
+        if (this.properties == null) {
+            throw new IllegalStateException("NiFiRegistryProperties cannot be null");
+        }
+
+        if (this.extensionManager == null) {
+            throw new IllegalStateException("ExtensionManager cannot be null");
+        }
+    }
+
+    @Override
+    public IdentityProvider getIdentityProvider(String identifier) {
+        return identityProviders.get(identifier);
+    }
+
+    @Bean
+    @Primary
+    public IdentityProvider getIdentityProvider() throws Exception {
+        if (identityProvider == null) {
+            // look up the login identity provider to use
+            final String loginIdentityProviderIdentifier = properties.getProperty(NiFiRegistryProperties.SECURITY_IDENTITY_PROVIDER);
+
+            // ensure the login identity provider class name was specified
+            if (StringUtils.isNotBlank(loginIdentityProviderIdentifier)) {
+                final IdentityProviders loginIdentityProviderConfiguration = loadLoginIdentityProvidersConfiguration();
+
+                // create each login identity provider
+                for (final Provider provider : loginIdentityProviderConfiguration.getProvider()) {
+                    identityProviders.put(provider.getIdentifier(), createLoginIdentityProvider(provider.getIdentifier(), provider.getClazz()));
+                }
+
+                // configure each login identity provider
+                for (final Provider provider : loginIdentityProviderConfiguration.getProvider()) {
+                    final IdentityProvider instance = identityProviders.get(provider.getIdentifier());
+                    instance.onConfigured(loadLoginIdentityProviderConfiguration(provider));
+                }
+
+                // get the login identity provider instance
+                identityProvider = getIdentityProvider(loginIdentityProviderIdentifier);
+
+                // ensure it was found
+                if (identityProvider == null) {
+                    throw new Exception(String.format("The specified login identity provider '%s' could not be found.", loginIdentityProviderIdentifier));
+                }
+            }
+        }
+
+        return identityProvider;
+    }
+
+    @Override
+    public void destroy() throws Exception {
+        if (identityProviders != null) {
+            identityProviders.entrySet().stream().forEach(e -> e.getValue().preDestruction());
+        }
+    }
+
+    private IdentityProviders loadLoginIdentityProvidersConfiguration() throws Exception {
+        final File loginIdentityProvidersConfigurationFile = properties.getIdentityProviderConfigurationFile();
+
+        // load the users from the specified file
+        if (loginIdentityProvidersConfigurationFile.exists()) {
+            try {
+                // find the schema
+                final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+                final Schema schema = schemaFactory.newSchema(IdentityProviders.class.getResource(LOGIN_IDENTITY_PROVIDERS_XSD));
+
+                // attempt to unmarshal
+                XMLStreamReader xsr = XmlUtils.createSafeReader(new StreamSource(loginIdentityProvidersConfigurationFile));
+                final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+                unmarshaller.setSchema(schema);
+                final JAXBElement<IdentityProviders> element = unmarshaller.unmarshal(xsr, IdentityProviders.class);
+                return element.getValue();
+            } catch (SAXException | JAXBException e) {
+                throw new Exception("Unable to load the login identity provider configuration file at: " + loginIdentityProvidersConfigurationFile.getAbsolutePath());
+            }
+        } else {
+            throw new Exception("Unable to find the login identity provider configuration file at " + loginIdentityProvidersConfigurationFile.getAbsolutePath());
+        }
+    }
+
+    private IdentityProvider createLoginIdentityProvider(final String identifier, final String loginIdentityProviderClassName) throws Exception {
+        final IdentityProvider instance;
+
+        final ClassLoader classLoader = extensionManager.getExtensionClassLoader(loginIdentityProviderClassName);
+        if (classLoader == null) {
+            throw new IllegalStateException("Extension not found in any of the configured class loaders: " + loginIdentityProviderClassName);
+        }
+
+        // attempt to load the class
+        Class<?> rawLoginIdentityProviderClass = Class.forName(loginIdentityProviderClassName, true, classLoader);
+        Class<? extends IdentityProvider> loginIdentityProviderClass = rawLoginIdentityProviderClass.asSubclass(IdentityProvider.class);
+
+        // otherwise create a new instance
+        Constructor constructor = loginIdentityProviderClass.getConstructor();
+        instance = (IdentityProvider) constructor.newInstance();
+
+        // method injection
+        performMethodInjection(instance, loginIdentityProviderClass);
+
+        // field injection
+        performFieldInjection(instance, loginIdentityProviderClass);
+
+        return instance;
+    }
+
+    private IdentityProviderConfigurationContext loadLoginIdentityProviderConfiguration(final Provider provider) {
+        final Map<String, String> providerProperties = new HashMap<>();
+
+        for (final Property property : provider.getProperty()) {
+            if (!StringUtils.isBlank(property.getEncryption())) {
+                String decryptedValue = decryptValue(property.getValue(), property.getEncryption());
+                providerProperties.put(property.getName(), decryptedValue);
+            } else {
+                providerProperties.put(property.getName(), property.getValue());
+            }
+        }
+
+        return new StandardIdentityProviderConfigurationContext(provider.getIdentifier(), this, providerProperties);
+    }
+
+    private void performMethodInjection(final IdentityProvider instance, final Class loginIdentityProviderClass)
+            throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+
+        for (final Method method : loginIdentityProviderClass.getMethods()) {
+            if (method.isAnnotationPresent(IdentityProviderContext.class)) {
+                // make the method accessible
+                final boolean isAccessible = method.isAccessible();
+                method.setAccessible(true);
+
+                try {
+                    final Class<?>[] argumentTypes = method.getParameterTypes();
+
+                    // look for setters (single argument)
+                    if (argumentTypes.length == 1) {
+                        final Class<?> argumentType = argumentTypes[0];
+
+                        // look for well known types
+                        if (NiFiRegistryProperties.class.isAssignableFrom(argumentType)) {
+                            // nifi properties injection
+                            method.invoke(instance, properties);
+                        }
+                    }
+                } finally {
+                    method.setAccessible(isAccessible);
+                }
+            }
+        }
+
+        final Class parentClass = loginIdentityProviderClass.getSuperclass();
+        if (parentClass != null && IdentityProvider.class.isAssignableFrom(parentClass)) {
+            performMethodInjection(instance, parentClass);
+        }
+    }
+
+    private void performFieldInjection(final IdentityProvider instance, final Class loginIdentityProviderClass) throws IllegalArgumentException, IllegalAccessException {
+        for (final Field field : loginIdentityProviderClass.getDeclaredFields()) {
+            if (field.isAnnotationPresent(IdentityProviderContext.class)) {
+                // make the method accessible
+                final boolean isAccessible = field.isAccessible();
+                field.setAccessible(true);
+
+                try {
+                    // get the type
+                    final Class<?> fieldType = field.getType();
+
+                    // only consider this field if it isn't set yet
+                    if (field.get(instance) == null) {
+                        // look for well known types
+                        if (NiFiRegistryProperties.class.isAssignableFrom(fieldType)) {
+                            // nifi properties injection
+                            field.set(instance, properties);
+                        }
+                    }
+
+                } finally {
+                    field.setAccessible(isAccessible);
+                }
+            }
+        }
+
+        final Class parentClass = loginIdentityProviderClass.getSuperclass();
+        if (parentClass != null && IdentityProvider.class.isAssignableFrom(parentClass)) {
+            performFieldInjection(instance, parentClass);
+        }
+    }
+
+    private String decryptValue(String cipherText, String encryptionScheme) throws SensitivePropertyProtectionException {
+        if (sensitivePropertyProvider == null) {
+            throw new SensitivePropertyProtectionException("Sensitive Property Provider dependency was never wired, so protected " +
+                    "properties cannot be decrypted. This usually indicates that a master key for this NiFi Registry was not " +
+                    "detected and configured during the bootstrap startup sequence. Contact the system administrator.");
+        }
+
+        if (!sensitivePropertyProvider.getIdentifierKey().equalsIgnoreCase(encryptionScheme)) {
+            throw new SensitivePropertyProtectionException("Identity Provider configuration XML was protected using " +
+                    encryptionScheme +
+                    ", but the configured Sensitive Property Provider supports " +
+                    sensitivePropertyProvider.getIdentifierKey() +
+                    ". Cannot configure this Identity Provider due to failing to decrypt protected configuration properties.");
+        }
+
+        return sensitivePropertyProvider.unprotect(cipherText);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/StandardIdentityProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/StandardIdentityProviderConfigurationContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/StandardIdentityProviderConfigurationContext.java
new file mode 100644
index 0000000..3e89dcc
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/StandardIdentityProviderConfigurationContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.security.authentication;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class StandardIdentityProviderConfigurationContext implements IdentityProviderConfigurationContext {
+
+    private final String identifier;
+    private final IdentityProviderLookup lookup;
+    private final Map<String, String> properties;
+
+    public StandardIdentityProviderConfigurationContext(String identifier, final IdentityProviderLookup lookup, Map<String, String> properties) {
+        this.identifier = identifier;
+        this.lookup = lookup;
+        this.properties = properties;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public IdentityProviderLookup getIdentityProviderLookup() {
+        return lookup;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.unmodifiableMap(properties);
+    }
+
+    @Override
+    public String getProperty(String property) {
+        return properties.get(property);
+    }
+
+}