You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2018/09/22 02:11:37 UTC
[38/51] [partial] nifi-registry git commit: NIFIREG-201 Refactoring
project structure to better isolate extensions
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
new file mode 100644
index 0000000..4faf007
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowMetaData.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.api.PushCommand;
+import org.eclipse.jgit.api.Status;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.eclipse.jgit.api.errors.NoHeadException;
+import org.eclipse.jgit.lib.ObjectId;
+import org.eclipse.jgit.lib.Repository;
+import org.eclipse.jgit.revwalk.RevCommit;
+import org.eclipse.jgit.revwalk.RevTree;
+import org.eclipse.jgit.storage.file.FileRepositoryBuilder;
+import org.eclipse.jgit.transport.CredentialsProvider;
+import org.eclipse.jgit.transport.PushResult;
+import org.eclipse.jgit.transport.RemoteConfig;
+import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider;
+import org.eclipse.jgit.treewalk.TreeWalk;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+class GitFlowMetaData {
+
+ static final int CURRENT_LAYOUT_VERSION = 1;
+
+ static final String LAYOUT_VERSION = "layoutVer";
+ static final String BUCKET_ID = "bucketId";
+ static final String FLOWS = "flows";
+ static final String VER = "ver";
+ static final String FILE = "file";
+ static final String BUCKET_FILENAME = "bucket.yml";
+
+ private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class);
+
+ private Repository gitRepo;
+ private String remoteToPush;
+ private CredentialsProvider credentialsProvider;
+
+ private final BlockingQueue<Long> pushQueue = new ArrayBlockingQueue<>(1);
+
+ /**
+ * Bucket ID to Bucket.
+ */
+ private Map<String, Bucket> buckets = new HashMap<>();
+
+ public void setRemoteToPush(String remoteToPush) {
+ this.remoteToPush = remoteToPush;
+ }
+
+ public void setRemoteCredential(String userName, String password) {
+ this.credentialsProvider = new UsernamePasswordCredentialsProvider(userName, password);
+ }
+
+ /**
+ * Open a Git repository using the specified directory.
+ * @param gitProjectRootDir a root directory of a Git project
+ * @return created Repository
+ * @throws IOException thrown when the specified directory does not exist,
+ * does not have read/write privilege or not containing .git directory
+ */
+ private Repository openRepository(final File gitProjectRootDir) throws IOException {
+
+ // Instead of using FileUtils.ensureDirectoryExistAndCanReadAndWrite, check availability manually here.
+ // Because the util will try to create a dir if not exist.
+ // The git dir should be initialized and configured by users.
+ if (!gitProjectRootDir.isDirectory()) {
+ throw new IOException(format("'%s' is not a directory or does not exist.", gitProjectRootDir));
+ }
+
+ if (!(gitProjectRootDir.canRead() && gitProjectRootDir.canWrite())) {
+ throw new IOException(format("Directory '%s' does not have read/write privilege.", gitProjectRootDir));
+ }
+
+ // Search .git dir but avoid searching parent directories.
+ final FileRepositoryBuilder builder = new FileRepositoryBuilder()
+ .readEnvironment()
+ .setMustExist(true)
+ .addCeilingDirectory(gitProjectRootDir)
+ .findGitDir(gitProjectRootDir);
+
+ if (builder.getGitDir() == null) {
+ throw new IOException(format("Directory '%s' does not contain a .git directory." +
+ " Please init and configure the directory with 'git init' command before using it from NiFi Registry.",
+ gitProjectRootDir));
+ }
+
+ return builder.build();
+ }
+
+ @SuppressWarnings("unchecked")
+ public void loadGitRepository(File gitProjectRootDir) throws IOException, GitAPIException {
+ gitRepo = openRepository(gitProjectRootDir);
+
+ try (final Git git = new Git(gitRepo)) {
+
+ // Check if remote exists.
+ if (!isEmpty(remoteToPush)) {
+ final List<RemoteConfig> remotes = git.remoteList().call();
+ final boolean isRemoteExist = remotes.stream().anyMatch(remote -> remote.getName().equals(remoteToPush));
+ if (!isRemoteExist) {
+ final List<String> remoteNames = remotes.stream().map(RemoteConfig::getName).collect(Collectors.toList());
+ throw new IllegalArgumentException(
+ format("The configured remote '%s' to push does not exist. Available remotes are %s", remoteToPush, remoteNames));
+ }
+ }
+
+ boolean isLatestCommit = true;
+ try {
+ for (RevCommit commit : git.log().call()) {
+ final String shortCommitId = commit.getId().abbreviate(7).name();
+ logger.debug("Processing a commit: {}", shortCommitId);
+ final RevTree tree = commit.getTree();
+
+ try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) {
+ treeWalk.addTree(tree);
+
+ // Path -> ObjectId
+ final Map<String, ObjectId> bucketObjectIds = new HashMap<>();
+ final Map<String, ObjectId> flowSnapshotObjectIds = new HashMap<>();
+ while (treeWalk.next()) {
+ if (treeWalk.isSubtree()) {
+ treeWalk.enterSubtree();
+ } else {
+ final String pathString = treeWalk.getPathString();
+ // TODO: what is this nth?? When does it get grater than 0? Tree count seems to be always 1..
+ if (pathString.endsWith("/" + BUCKET_FILENAME)) {
+ bucketObjectIds.put(pathString, treeWalk.getObjectId(0));
+ } else if (pathString.endsWith(GitFlowPersistenceProvider.SNAPSHOT_EXTENSION)) {
+ flowSnapshotObjectIds.put(pathString, treeWalk.getObjectId(0));
+ }
+ }
+ }
+
+ if (bucketObjectIds.isEmpty()) {
+ // No bucket.yml means at this point, all flows are deleted. No need to scan older commits because those are already deleted.
+ logger.debug("Tree at commit {} does not contain any " + BUCKET_FILENAME + ". Stop loading commits here.", shortCommitId);
+ return;
+ }
+
+ loadBuckets(gitRepo, commit, isLatestCommit, bucketObjectIds, flowSnapshotObjectIds);
+ isLatestCommit = false;
+ }
+ }
+ } catch (NoHeadException e) {
+ logger.debug("'{}' does not have any commit yet. Starting with empty buckets.", gitProjectRootDir);
+ }
+
+ }
+ }
+
+ void startPushThread() {
+ // If successfully loaded, start pushing thread if necessary.
+ if (isEmpty(remoteToPush)) {
+ return;
+ }
+
+ final ThreadFactory threadFactory = new BasicThreadFactory.Builder()
+ .daemon(true).namingPattern(getClass().getSimpleName() + " Push thread").build();
+
+ // Use scheduled fixed delay to control the minimum interval between push activities.
+ // The necessity of executing push is controlled by offering messages to the pushQueue.
+ // If multiple commits are made within this time window, those are pushed by a single push execution.
+ final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
+ executorService.scheduleWithFixedDelay(() -> {
+
+ final Long offeredTimestamp;
+ try {
+ offeredTimestamp = pushQueue.take();
+ } catch (InterruptedException e) {
+ logger.warn("Waiting for push request has been interrupted due to {}", e.getMessage(), e);
+ return;
+ }
+
+ logger.debug("Took a push request sent at {} to {}...", offeredTimestamp, remoteToPush);
+ final PushCommand pushCommand = new Git(gitRepo).push().setRemote(remoteToPush);
+ if (credentialsProvider != null) {
+ pushCommand.setCredentialsProvider(credentialsProvider);
+ }
+
+ try {
+ final Iterable<PushResult> pushResults = pushCommand.call();
+ for (PushResult pushResult : pushResults) {
+ logger.debug(pushResult.getMessages());
+ }
+ } catch (GitAPIException e) {
+ logger.error(format("Failed to push commits to %s due to %s", remoteToPush, e), e);
+ }
+
+ }, 10, 10, TimeUnit.SECONDS);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadBuckets(Repository gitRepo, RevCommit commit, boolean isLatestCommit, Map<String, ObjectId> bucketObjectIds, Map<String, ObjectId> flowSnapshotObjectIds) throws IOException {
+ final Yaml yaml = new Yaml();
+ for (String bucketFilePath : bucketObjectIds.keySet()) {
+ final ObjectId bucketObjectId = bucketObjectIds.get(bucketFilePath);
+ final Map<String, Object> bucketMeta;
+ try (InputStream bucketIn = gitRepo.newObjectReader().open(bucketObjectId).openStream()) {
+ bucketMeta = yaml.load(bucketIn);
+ }
+
+ if (!validateRequiredValue(bucketMeta, bucketFilePath, LAYOUT_VERSION, BUCKET_ID, FLOWS)) {
+ continue;
+ }
+
+ int layoutVersion = (int) bucketMeta.get(LAYOUT_VERSION);
+ if (layoutVersion > CURRENT_LAYOUT_VERSION) {
+ logger.warn("{} has unsupported {} {}. This Registry can only support {} or lower. Skipping it.",
+ bucketFilePath, LAYOUT_VERSION, layoutVersion, CURRENT_LAYOUT_VERSION);
+ continue;
+ }
+
+ final String bucketId = (String) bucketMeta.get(BUCKET_ID);
+
+ final Bucket bucket;
+ if (isLatestCommit) {
+ // If this is the latest commit, then create one.
+ bucket = getBucketOrCreate(bucketId);
+ } else {
+ // Otherwise non-existing bucket means it's already deleted.
+ final Optional<Bucket> bucketOpt = getBucket(bucketId);
+ if (bucketOpt.isPresent()) {
+ bucket = bucketOpt.get();
+ } else {
+ logger.debug("Bucket {} does not exist any longer. It may have been deleted.", bucketId);
+ continue;
+ }
+ }
+
+ // Since the bucketName is restored from pathname, it can be different from the original bucket name when it sanitized.
+ final String bucketDirName = bucketFilePath.substring(0, bucketFilePath.lastIndexOf("/"));
+
+ // Since commits are read in LIFO order, avoid old commits overriding the latest bucket name.
+ if (isEmpty(bucket.getBucketDirName())) {
+ bucket.setBucketDirName(bucketDirName);
+ }
+
+ final Map<String, Object> flows = (Map<String, Object>) bucketMeta.get(FLOWS);
+ loadFlows(commit, isLatestCommit, bucket, bucketFilePath, flows, flowSnapshotObjectIds);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadFlows(RevCommit commit, boolean isLatestCommit, Bucket bucket, String backetFilePath, Map<String, Object> flows, Map<String, ObjectId> flowSnapshotObjectIds) {
+ for (String flowId : flows.keySet()) {
+ final Map<String, Object> flowMeta = (Map<String, Object>) flows.get(flowId);
+
+ if (!validateRequiredValue(flowMeta, backetFilePath + ":" + flowId, VER, FILE)) {
+ continue;
+ }
+
+ final Flow flow;
+ if (isLatestCommit) {
+ // If this is the latest commit, then create one.
+ flow = bucket.getFlowOrCreate(flowId);
+ } else {
+ // Otherwise non-existing flow means it's already deleted.
+ final Optional<Flow> flowOpt = bucket.getFlow(flowId);
+ if (flowOpt.isPresent()) {
+ flow = flowOpt.get();
+ } else {
+ logger.debug("Flow {} does not exist in bucket {}:{} any longer. It may have been deleted.", flowId, bucket.getBucketDirName(), bucket.getBucketId());
+ continue;
+ }
+ }
+
+ final int version = (int) flowMeta.get(VER);
+ final String flowSnapshotFilename = (String) flowMeta.get(FILE);
+
+ // Since commits are read in LIFO order, avoid old commits overriding the latest pointer.
+ if (!flow.hasVersion(version)) {
+ final Flow.FlowPointer pointer = new Flow.FlowPointer(flowSnapshotFilename);
+ final File flowSnapshotFile = new File(new File(backetFilePath).getParent(), flowSnapshotFilename);
+ final ObjectId objectId = flowSnapshotObjectIds.get(flowSnapshotFile.getPath());
+ if (objectId == null) {
+ logger.warn("Git object id for Flow {} version {} with path {} in bucket {}:{} was not found. Ignoring this entry.",
+ flowId, version, flowSnapshotFile.getPath(), bucket.getBucketDirName(), bucket.getBucketId());
+ continue;
+ }
+ pointer.setGitRev(commit.getName());
+ pointer.setObjectId(objectId.getName());
+ flow.putVersion(version, pointer);
+ }
+ }
+ }
+
+ private boolean validateRequiredValue(final Map map, String nameOfMap, Object ... keys) {
+ for (Object key : keys) {
+ if (!map.containsKey(key)) {
+ logger.warn("{} does not have {}. Skipping it.", nameOfMap, key);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public Bucket getBucketOrCreate(String bucketId) {
+ return buckets.computeIfAbsent(bucketId, k -> new Bucket(bucketId));
+ }
+
+ public Optional<Bucket> getBucket(String bucketId) {
+ return Optional.ofNullable(buckets.get(bucketId));
+ }
+
+
+ void saveBucket(final Bucket bucket, final File bucketDir) throws IOException {
+ final Yaml yaml = new Yaml();
+ final Map<String, Object> serializedBucket = bucket.serialize();
+ final File bucketFile = new File(bucketDir, GitFlowMetaData.BUCKET_FILENAME);
+
+ try (final Writer writer = new OutputStreamWriter(
+ new FileOutputStream(bucketFile), StandardCharsets.UTF_8)) {
+ yaml.dump(serializedBucket, writer);
+ }
+ }
+
+ boolean isGitDirectoryClean() throws GitAPIException {
+ final Status status = new Git(gitRepo).status().call();
+ return status.isClean() && !status.hasUncommittedChanges();
+ }
+
+ /**
+ * Create a Git commit.
+ * @param author The name of a NiFi Registry user who created the snapshot. It will be added to the commit message.
+ * @param message Commit message.
+ * @param bucket A bucket to commit.
+ * @param flowPointer A flow pointer for the flow snapshot which is updated.
+ * After a commit is created, new commit rev id and flow snapshot file object id are set to this pointer.
+ * It can be null if none of flow content is modified.
+ */
+ void commit(String author, String message, Bucket bucket, Flow.FlowPointer flowPointer) throws GitAPIException, IOException {
+ try (final Git git = new Git(gitRepo)) {
+ // Execute add command for newly added files (if any).
+ git.add().addFilepattern(".").call();
+
+ // Execute add command again for deleted files (if any).
+ git.add().addFilepattern(".").setUpdate(true).call();
+
+ final String commitMessage = isEmpty(author) ? message
+ : format("%s\n\nBy NiFi Registry user: %s", message, author);
+ final RevCommit commit = git.commit()
+ .setMessage(commitMessage)
+ .call();
+
+ if (flowPointer != null) {
+ final RevTree tree = commit.getTree();
+ final String bucketDirName = bucket.getBucketDirName();
+ final String flowSnapshotPath = new File(bucketDirName, flowPointer.getFileName()).getPath();
+ try (final TreeWalk treeWalk = new TreeWalk(gitRepo)) {
+ treeWalk.addTree(tree);
+
+ while (treeWalk.next()) {
+ if (treeWalk.isSubtree()) {
+ treeWalk.enterSubtree();
+ } else {
+ final String pathString = treeWalk.getPathString();
+ if (pathString.equals(flowSnapshotPath)) {
+ // Capture updated object id.
+ final String flowSnapshotObjectId = treeWalk.getObjectId(0).getName();
+ flowPointer.setObjectId(flowSnapshotObjectId);
+ break;
+ }
+ }
+ }
+ }
+
+ flowPointer.setGitRev(commit.getName());
+ }
+
+ // Push if necessary.
+ if (!isEmpty(remoteToPush)) {
+ // Use different thread since it takes longer.
+ final long offeredTimestamp = System.currentTimeMillis();
+ if (pushQueue.offer(offeredTimestamp)) {
+ logger.debug("New push request is offered at {}.", offeredTimestamp);
+ }
+ }
+
+ }
+ }
+
+ byte[] getContent(String objectId) throws IOException {
+ final ObjectId flowSnapshotObjectId = gitRepo.resolve(objectId);
+ return gitRepo.newObjectReader().open(flowSnapshotObjectId).getBytes();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
new file mode 100644
index 0000000..f642632
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/flow/git/GitFlowPersistenceProvider.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.flow.git;
+
+import org.apache.nifi.registry.flow.FlowPersistenceException;
+import org.apache.nifi.registry.flow.FlowPersistenceProvider;
+import org.apache.nifi.registry.flow.FlowSnapshotContext;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.eclipse.jgit.api.errors.GitAPIException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.lang.String.format;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.nifi.registry.util.FileUtils.sanitizeFilename;
+
+public class GitFlowPersistenceProvider implements FlowPersistenceProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(GitFlowMetaData.class);
+ static final String FLOW_STORAGE_DIR_PROP = "Flow Storage Directory";
+ private static final String REMOTE_TO_PUSH = "Remote To Push";
+ private static final String REMOTE_ACCESS_USER = "Remote Access User";
+ private static final String REMOTE_ACCESS_PASSWORD = "Remote Access Password";
+ static final String SNAPSHOT_EXTENSION = ".snapshot";
+
+ private File flowStorageDir;
+ private GitFlowMetaData flowMetaData;
+
+ @Override
+ public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ flowMetaData = new GitFlowMetaData();
+
+ final Map<String,String> props = configurationContext.getProperties();
+ if (!props.containsKey(FLOW_STORAGE_DIR_PROP)) {
+ throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " must be provided");
+ }
+
+ final String flowStorageDirValue = props.get(FLOW_STORAGE_DIR_PROP);
+ if (isEmpty(flowStorageDirValue)) {
+ throw new ProviderCreationException("The property " + FLOW_STORAGE_DIR_PROP + " cannot be null or blank");
+ }
+
+ flowMetaData.setRemoteToPush(props.get(REMOTE_TO_PUSH));
+
+ final String remoteUser = props.get(REMOTE_ACCESS_USER);
+ final String remotePassword = props.get(REMOTE_ACCESS_PASSWORD);
+ if (!isEmpty(remoteUser) && isEmpty(remotePassword)) {
+ throw new ProviderCreationException(format("The property %s is specified but %s is not." +
+ " %s is required for username password authentication.",
+ REMOTE_ACCESS_USER, REMOTE_ACCESS_PASSWORD, REMOTE_ACCESS_PASSWORD));
+ }
+ if (!isEmpty(remotePassword)) {
+ flowMetaData.setRemoteCredential(remoteUser, remotePassword);
+ }
+
+ try {
+ flowStorageDir = new File(flowStorageDirValue);
+ flowMetaData.loadGitRepository(flowStorageDir);
+ flowMetaData.startPushThread();
+ logger.info("Configured GitFlowPersistenceProvider with Flow Storage Directory {}",
+ new Object[] {flowStorageDir.getAbsolutePath()});
+ } catch (IOException|GitAPIException e) {
+ throw new ProviderCreationException("Failed to load a git repository " + flowStorageDir, e);
+ }
+ }
+
+ @Override
+ public void saveFlowContent(FlowSnapshotContext context, byte[] content) throws FlowPersistenceException {
+
+ try {
+ // Check if working dir is clean, any uncommitted file?
+ if (!flowMetaData.isGitDirectoryClean()) {
+ throw new FlowPersistenceException(format("Git directory %s is not clean" +
+ " or has uncommitted changes, resolve those changes first to save flow contents.",
+ flowStorageDir));
+ }
+ } catch (GitAPIException e) {
+ throw new FlowPersistenceException(format("Failed to get Git status for directory %s due to %s",
+ flowStorageDir, e));
+ }
+
+ final String bucketId = context.getBucketId();
+ final Bucket bucket = flowMetaData.getBucketOrCreate(bucketId);
+ final String currentBucketDirName = bucket.getBucketDirName();
+ final String bucketDirName = sanitizeFilename(context.getBucketName());
+ final boolean isBucketNameChanged = !bucketDirName.equals(currentBucketDirName);
+ bucket.setBucketDirName(bucketDirName);
+
+ final Flow flow = bucket.getFlowOrCreate(context.getFlowId());
+ final String flowSnapshotFilename = sanitizeFilename(context.getFlowName()) + SNAPSHOT_EXTENSION;
+
+ final Optional<String> currentFlowSnapshotFilename = flow
+ .getLatestVersion().map(flow::getFlowVersion).map(Flow.FlowPointer::getFileName);
+
+ // Add new version.
+ final Flow.FlowPointer flowPointer = new Flow.FlowPointer(flowSnapshotFilename);
+ flow.putVersion(context.getVersion(), flowPointer);
+
+ final File bucketDir = new File(flowStorageDir, bucketDirName);
+ final File flowSnippetFile = new File(bucketDir, flowSnapshotFilename);
+
+ final File currentBucketDir = isEmpty(currentBucketDirName) ? null : new File(flowStorageDir, currentBucketDirName);
+ if (currentBucketDir != null && currentBucketDir.isDirectory()) {
+ if (isBucketNameChanged) {
+ logger.debug("Detected bucket name change from {} to {}, moving it.", currentBucketDirName, bucketDirName);
+ if (!currentBucketDir.renameTo(bucketDir)) {
+ throw new FlowPersistenceException(format("Failed to move existing bucket %s to %s.", currentBucketDir, bucketDir));
+ }
+ }
+ } else {
+ if (!bucketDir.mkdirs()) {
+ throw new FlowPersistenceException(format("Failed to create new bucket dir %s.", bucketDir));
+ }
+ }
+
+
+ try {
+ if (currentFlowSnapshotFilename.isPresent() && !flowSnapshotFilename.equals(currentFlowSnapshotFilename.get())) {
+ // Delete old file if flow name has been changed.
+ final File latestFlowSnapshotFile = new File(bucketDir, currentFlowSnapshotFilename.get());
+ logger.debug("Detected flow name change from {} to {}, deleting the old snapshot file.",
+ currentFlowSnapshotFilename.get(), flowSnapshotFilename);
+ latestFlowSnapshotFile.delete();
+ }
+
+ // Save the content.
+ try (final OutputStream os = new FileOutputStream(flowSnippetFile)) {
+ os.write(content);
+ os.flush();
+ }
+
+ // Write a bucket file.
+ flowMetaData.saveBucket(bucket, bucketDir);
+
+ // Create a Git Commit.
+ flowMetaData.commit(context.getAuthor(), context.getComments(), bucket, flowPointer);
+
+ } catch (IOException|GitAPIException e) {
+ throw new FlowPersistenceException("Failed to persist flow.", e);
+ }
+
+ // TODO: What if user rebased commits? Version number to Commit ID mapping will be broken.
+ }
+
+ @Override
+ public byte[] getFlowContent(String bucketId, String flowId, int version) throws FlowPersistenceException {
+
+ final Bucket bucket = getBucketOrFail(bucketId);
+ final Flow flow = getFlowOrFail(bucket, flowId);
+ if (!flow.hasVersion(version)) {
+ throw new FlowPersistenceException(format("Flow ID %s version %d was not found in bucket %s:%s.",
+ flowId, version, bucket.getBucketDirName(), bucketId));
+ }
+
+ final Flow.FlowPointer flowPointer = flow.getFlowVersion(version);
+ try {
+ return flowMetaData.getContent(flowPointer.getObjectId());
+ } catch (IOException e) {
+ throw new FlowPersistenceException(format("Failed to get content of Flow ID %s version %d in bucket %s:%s due to %s.",
+ flowId, version, bucket.getBucketDirName(), bucketId, e), e);
+ }
+ }
+
+ // TODO: Need to add userId argument?
+ @Override
+ public void deleteAllFlowContent(String bucketId, String flowId) throws FlowPersistenceException {
+ final Bucket bucket = getBucketOrFail(bucketId);
+ final Flow flow = getFlowOrFail(bucket, flowId);
+ final Optional<Integer> latestVersionOpt = flow.getLatestVersion();
+ if (!latestVersionOpt.isPresent()) {
+ throw new IllegalStateException("Flow version is not added yet, can not be deleted.");
+ }
+
+ final Integer latestVersion = latestVersionOpt.get();
+ final Flow.FlowPointer flowPointer = flow.getFlowVersion(latestVersion);
+
+ // Delete the flow snapshot.
+ final File bucketDir = new File(flowStorageDir, bucket.getBucketDirName());
+ final File flowSnapshotFile = new File(bucketDir, flowPointer.getFileName());
+ if (flowSnapshotFile.exists()) {
+ if (!flowSnapshotFile.delete()) {
+ throw new FlowPersistenceException(format("Failed to delete flow content for %s:%s in bucket %s:%s",
+ flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId));
+ }
+ }
+
+ bucket.removeFlow(flowId);
+
+ try {
+
+ if (bucket.isEmpty()) {
+ // delete bucket dir if this is the last flow.
+ FileUtils.deleteFile(bucketDir, true);
+ } else {
+ // Write a bucket file.
+ flowMetaData.saveBucket(bucket, bucketDir);
+ }
+
+ // Create a Git Commit.
+ final String commitMessage = format("Deleted flow %s:%s in bucket %s:%s.",
+ flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId);
+ flowMetaData.commit(null, commitMessage, bucket, null);
+
+ } catch (IOException|GitAPIException e) {
+ throw new FlowPersistenceException(format("Failed to delete flow %s:%s in bucket %s:%s due to %s",
+ flowPointer.getFileName(), flowId, bucket.getBucketDirName(), bucketId, e), e);
+ }
+
+ }
+
+ private Bucket getBucketOrFail(String bucketId) throws FlowPersistenceException {
+ final Optional<Bucket> bucketOpt = flowMetaData.getBucket(bucketId);
+ if (!bucketOpt.isPresent()) {
+ throw new FlowPersistenceException(format("Bucket ID %s was not found.", bucketId));
+ }
+
+ return bucketOpt.get();
+ }
+
+ private Flow getFlowOrFail(Bucket bucket, String flowId) throws FlowPersistenceException {
+ final Optional<Flow> flowOpt = bucket.getFlow(flowId);
+ if (!flowOpt.isPresent()) {
+ throw new FlowPersistenceException(format("Flow ID %s was not found in bucket %s:%s.",
+ flowId, bucket.getBucketDirName(), bucket.getBucketId()));
+ }
+
+ return flowOpt.get();
+ }
+
+ @Override
+ public void deleteFlowContent(String bucketId, String flowId, int version) throws FlowPersistenceException {
+ // TODO: Do nothing? This signature is not used. Actually there's nothing to do to the old versions as those exist in old commits even if this method is called.
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java
new file mode 100644
index 0000000..9ceb59f
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/LoggingEventHookProvider.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.hook;
+
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventField;
+import org.apache.nifi.registry.hook.EventHookException;
+import org.apache.nifi.registry.hook.EventHookProvider;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingEventHookProvider
+ implements EventHookProvider {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(LoggingEventHookProvider.class);
+
+ @Override
+ public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ // Nothing to do
+ }
+
+ @Override
+ public void handle(final Event event) throws EventHookException {
+
+ final StringBuilder builder = new StringBuilder()
+ .append(event.getEventType())
+ .append(" [");
+
+ int count = 0;
+ for (final EventField argument : event.getFields()) {
+ if (count > 0) {
+ builder.append(", ");
+ }
+ builder.append(argument.getName()).append("=").append(argument.getValue());
+ count++;
+ }
+
+ builder.append("] ");
+
+ LOGGER.info(builder.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java
new file mode 100644
index 0000000..f96115e
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/provider/hook/ScriptEventHookProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.provider.hook;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.hook.Event;
+import org.apache.nifi.registry.hook.EventField;
+import org.apache.nifi.registry.hook.WhitelistFilteringEventHookProvider;
+import org.apache.nifi.registry.provider.ProviderConfigurationContext;
+import org.apache.nifi.registry.provider.ProviderCreationException;
+import org.apache.nifi.registry.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A EventHookProvider that is used to execute a script to handle the event.
+ */
+public class ScriptEventHookProvider
+ extends WhitelistFilteringEventHookProvider {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(ScriptEventHookProvider.class);
+ static final String SCRIPT_PATH_PROP = "Script Path";
+ static final String SCRIPT_WORKDIR_PROP = "Working Directory";
+ private File scriptFile;
+ private File workDirFile;
+
+
+ @Override
+ public void handle(final Event event) {
+ List<String> command = new ArrayList<>();
+ command.add(scriptFile.getAbsolutePath());
+ command.add(event.getEventType().name());
+
+ for (EventField arg : event.getFields()) {
+ command.add(arg.getValue());
+ }
+
+ final String commandString = StringUtils.join(command, " ");
+ final ProcessBuilder builder = new ProcessBuilder(command);
+ builder.directory(workDirFile);
+ LOGGER.debug("Execution of " + commandString);
+
+ try {
+ builder.start();
+ } catch (IOException e) {
+ LOGGER.error("Execution of {0} failed with: {1}", new Object[] { commandString, e.getLocalizedMessage() }, e);
+ }
+ }
+
+ @Override
+ public void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException {
+ super.onConfigured(configurationContext);
+
+ final Map<String,String> props = configurationContext.getProperties();
+ if (!props.containsKey(SCRIPT_PATH_PROP)) {
+ throw new ProviderCreationException("The property " + SCRIPT_PATH_PROP + " must be provided");
+ }
+
+ final String scripPath = props.get(SCRIPT_PATH_PROP);
+ if (StringUtils.isBlank(scripPath)) {
+ throw new ProviderCreationException("The property " + SCRIPT_PATH_PROP + " cannot be null or blank");
+ }
+
+ if(props.containsKey(SCRIPT_WORKDIR_PROP) && !StringUtils.isBlank(props.get(SCRIPT_WORKDIR_PROP))) {
+ final String workdir = props.get(SCRIPT_WORKDIR_PROP);
+ try {
+ workDirFile = new File(workdir);
+ FileUtils.ensureDirectoryExistAndCanRead(workDirFile);
+ } catch (IOException e) {
+ throw new ProviderCreationException("The working directory " + workdir + " cannot be read.");
+ }
+ }
+
+ scriptFile = new File(scripPath);
+ if(scriptFile.isFile() && scriptFile.canExecute()) {
+ LOGGER.info("Configured ScriptEventHookProvider with script {}", new Object[] {scriptFile.getAbsolutePath()});
+ } else {
+ throw new ProviderCreationException("The script file " + scriptFile.getAbsolutePath() + " cannot be executed.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/IdentityProviderFactory.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/IdentityProviderFactory.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/IdentityProviderFactory.java
new file mode 100644
index 0000000..3c2a3f4
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/IdentityProviderFactory.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.security.authentication;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.registry.extension.ExtensionManager;
+import org.apache.nifi.registry.properties.NiFiRegistryProperties;
+import org.apache.nifi.registry.properties.SensitivePropertyProtectionException;
+import org.apache.nifi.registry.properties.SensitivePropertyProvider;
+import org.apache.nifi.registry.security.authentication.annotation.IdentityProviderContext;
+import org.apache.nifi.registry.security.authentication.generated.IdentityProviders;
+import org.apache.nifi.registry.security.authentication.generated.Property;
+import org.apache.nifi.registry.security.authentication.generated.Provider;
+import org.apache.nifi.registry.security.util.XmlUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
+import org.springframework.lang.Nullable;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLStreamReader;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class IdentityProviderFactory implements IdentityProviderLookup, DisposableBean {
+
+ private static final Logger logger = LoggerFactory.getLogger(IdentityProviderFactory.class);
+ private static final String LOGIN_IDENTITY_PROVIDERS_XSD = "/identity-providers.xsd";
+ private static final String JAXB_GENERATED_PATH = "org.apache.nifi.registry.security.authentication.generated";
+ private static final JAXBContext JAXB_CONTEXT = initializeJaxbContext();
+
+ private static JAXBContext initializeJaxbContext() {
+ try {
+ return JAXBContext.newInstance(JAXB_GENERATED_PATH, IdentityProviderFactory.class.getClassLoader());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXBContext.");
+ }
+ }
+
+ private NiFiRegistryProperties properties;
+ private ExtensionManager extensionManager;
+ private SensitivePropertyProvider sensitivePropertyProvider;
+ private IdentityProvider identityProvider;
+ private final Map<String, IdentityProvider> identityProviders = new HashMap<>();
+
+ @Autowired
+ public IdentityProviderFactory(
+ final NiFiRegistryProperties properties,
+ final ExtensionManager extensionManager,
+ @Nullable final SensitivePropertyProvider sensitivePropertyProvider) {
+ this.properties = properties;
+ this.extensionManager = extensionManager;
+ this.sensitivePropertyProvider = sensitivePropertyProvider;
+
+ if (this.properties == null) {
+ throw new IllegalStateException("NiFiRegistryProperties cannot be null");
+ }
+
+ if (this.extensionManager == null) {
+ throw new IllegalStateException("ExtensionManager cannot be null");
+ }
+ }
+
+ @Override
+ public IdentityProvider getIdentityProvider(String identifier) {
+ return identityProviders.get(identifier);
+ }
+
+ @Bean
+ @Primary
+ public IdentityProvider getIdentityProvider() throws Exception {
+ if (identityProvider == null) {
+ // look up the login identity provider to use
+ final String loginIdentityProviderIdentifier = properties.getProperty(NiFiRegistryProperties.SECURITY_IDENTITY_PROVIDER);
+
+ // ensure the login identity provider class name was specified
+ if (StringUtils.isNotBlank(loginIdentityProviderIdentifier)) {
+ final IdentityProviders loginIdentityProviderConfiguration = loadLoginIdentityProvidersConfiguration();
+
+ // create each login identity provider
+ for (final Provider provider : loginIdentityProviderConfiguration.getProvider()) {
+ identityProviders.put(provider.getIdentifier(), createLoginIdentityProvider(provider.getIdentifier(), provider.getClazz()));
+ }
+
+ // configure each login identity provider
+ for (final Provider provider : loginIdentityProviderConfiguration.getProvider()) {
+ final IdentityProvider instance = identityProviders.get(provider.getIdentifier());
+ instance.onConfigured(loadLoginIdentityProviderConfiguration(provider));
+ }
+
+ // get the login identity provider instance
+ identityProvider = getIdentityProvider(loginIdentityProviderIdentifier);
+
+ // ensure it was found
+ if (identityProvider == null) {
+ throw new Exception(String.format("The specified login identity provider '%s' could not be found.", loginIdentityProviderIdentifier));
+ }
+ }
+ }
+
+ return identityProvider;
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (identityProviders != null) {
+ identityProviders.entrySet().stream().forEach(e -> e.getValue().preDestruction());
+ }
+ }
+
+ private IdentityProviders loadLoginIdentityProvidersConfiguration() throws Exception {
+ final File loginIdentityProvidersConfigurationFile = properties.getIdentityProviderConfigurationFile();
+
+ // load the users from the specified file
+ if (loginIdentityProvidersConfigurationFile.exists()) {
+ try {
+ // find the schema
+ final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ final Schema schema = schemaFactory.newSchema(IdentityProviders.class.getResource(LOGIN_IDENTITY_PROVIDERS_XSD));
+
+ // attempt to unmarshal
+ XMLStreamReader xsr = XmlUtils.createSafeReader(new StreamSource(loginIdentityProvidersConfigurationFile));
+ final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller();
+ unmarshaller.setSchema(schema);
+ final JAXBElement<IdentityProviders> element = unmarshaller.unmarshal(xsr, IdentityProviders.class);
+ return element.getValue();
+ } catch (SAXException | JAXBException e) {
+ throw new Exception("Unable to load the login identity provider configuration file at: " + loginIdentityProvidersConfigurationFile.getAbsolutePath());
+ }
+ } else {
+ throw new Exception("Unable to find the login identity provider configuration file at " + loginIdentityProvidersConfigurationFile.getAbsolutePath());
+ }
+ }
+
+ private IdentityProvider createLoginIdentityProvider(final String identifier, final String loginIdentityProviderClassName) throws Exception {
+ final IdentityProvider instance;
+
+ final ClassLoader classLoader = extensionManager.getExtensionClassLoader(loginIdentityProviderClassName);
+ if (classLoader == null) {
+ throw new IllegalStateException("Extension not found in any of the configured class loaders: " + loginIdentityProviderClassName);
+ }
+
+ // attempt to load the class
+ Class<?> rawLoginIdentityProviderClass = Class.forName(loginIdentityProviderClassName, true, classLoader);
+ Class<? extends IdentityProvider> loginIdentityProviderClass = rawLoginIdentityProviderClass.asSubclass(IdentityProvider.class);
+
+ // otherwise create a new instance
+ Constructor constructor = loginIdentityProviderClass.getConstructor();
+ instance = (IdentityProvider) constructor.newInstance();
+
+ // method injection
+ performMethodInjection(instance, loginIdentityProviderClass);
+
+ // field injection
+ performFieldInjection(instance, loginIdentityProviderClass);
+
+ return instance;
+ }
+
+ private IdentityProviderConfigurationContext loadLoginIdentityProviderConfiguration(final Provider provider) {
+ final Map<String, String> providerProperties = new HashMap<>();
+
+ for (final Property property : provider.getProperty()) {
+ if (!StringUtils.isBlank(property.getEncryption())) {
+ String decryptedValue = decryptValue(property.getValue(), property.getEncryption());
+ providerProperties.put(property.getName(), decryptedValue);
+ } else {
+ providerProperties.put(property.getName(), property.getValue());
+ }
+ }
+
+ return new StandardIdentityProviderConfigurationContext(provider.getIdentifier(), this, providerProperties);
+ }
+
+ private void performMethodInjection(final IdentityProvider instance, final Class loginIdentityProviderClass)
+ throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+
+ for (final Method method : loginIdentityProviderClass.getMethods()) {
+ if (method.isAnnotationPresent(IdentityProviderContext.class)) {
+ // make the method accessible
+ final boolean isAccessible = method.isAccessible();
+ method.setAccessible(true);
+
+ try {
+ final Class<?>[] argumentTypes = method.getParameterTypes();
+
+ // look for setters (single argument)
+ if (argumentTypes.length == 1) {
+ final Class<?> argumentType = argumentTypes[0];
+
+ // look for well known types
+ if (NiFiRegistryProperties.class.isAssignableFrom(argumentType)) {
+ // nifi properties injection
+ method.invoke(instance, properties);
+ }
+ }
+ } finally {
+ method.setAccessible(isAccessible);
+ }
+ }
+ }
+
+ final Class parentClass = loginIdentityProviderClass.getSuperclass();
+ if (parentClass != null && IdentityProvider.class.isAssignableFrom(parentClass)) {
+ performMethodInjection(instance, parentClass);
+ }
+ }
+
+ private void performFieldInjection(final IdentityProvider instance, final Class loginIdentityProviderClass) throws IllegalArgumentException, IllegalAccessException {
+ for (final Field field : loginIdentityProviderClass.getDeclaredFields()) {
+ if (field.isAnnotationPresent(IdentityProviderContext.class)) {
+ // make the method accessible
+ final boolean isAccessible = field.isAccessible();
+ field.setAccessible(true);
+
+ try {
+ // get the type
+ final Class<?> fieldType = field.getType();
+
+ // only consider this field if it isn't set yet
+ if (field.get(instance) == null) {
+ // look for well known types
+ if (NiFiRegistryProperties.class.isAssignableFrom(fieldType)) {
+ // nifi properties injection
+ field.set(instance, properties);
+ }
+ }
+
+ } finally {
+ field.setAccessible(isAccessible);
+ }
+ }
+ }
+
+ final Class parentClass = loginIdentityProviderClass.getSuperclass();
+ if (parentClass != null && IdentityProvider.class.isAssignableFrom(parentClass)) {
+ performFieldInjection(instance, parentClass);
+ }
+ }
+
+ private String decryptValue(String cipherText, String encryptionScheme) throws SensitivePropertyProtectionException {
+ if (sensitivePropertyProvider == null) {
+ throw new SensitivePropertyProtectionException("Sensitive Property Provider dependency was never wired, so protected " +
+ "properties cannot be decrypted. This usually indicates that a master key for this NiFi Registry was not " +
+ "detected and configured during the bootstrap startup sequence. Contact the system administrator.");
+ }
+
+ if (!sensitivePropertyProvider.getIdentifierKey().equalsIgnoreCase(encryptionScheme)) {
+ throw new SensitivePropertyProtectionException("Identity Provider configuration XML was protected using " +
+ encryptionScheme +
+ ", but the configured Sensitive Property Provider supports " +
+ sensitivePropertyProvider.getIdentifierKey() +
+ ". Cannot configure this Identity Provider due to failing to decrypt protected configuration properties.");
+ }
+
+ return sensitivePropertyProvider.unprotect(cipherText);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-registry/blob/6f26290d/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/StandardIdentityProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/StandardIdentityProviderConfigurationContext.java b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/StandardIdentityProviderConfigurationContext.java
new file mode 100644
index 0000000..3e89dcc
--- /dev/null
+++ b/nifi-registry-core/nifi-registry-framework/src/main/java/org/apache/nifi/registry/security/authentication/StandardIdentityProviderConfigurationContext.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.registry.security.authentication;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class StandardIdentityProviderConfigurationContext implements IdentityProviderConfigurationContext {
+
+ private final String identifier;
+ private final IdentityProviderLookup lookup;
+ private final Map<String, String> properties;
+
+ public StandardIdentityProviderConfigurationContext(String identifier, final IdentityProviderLookup lookup, Map<String, String> properties) {
+ this.identifier = identifier;
+ this.lookup = lookup;
+ this.properties = properties;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public IdentityProviderLookup getIdentityProviderLookup() {
+ return lookup;
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return Collections.unmodifiableMap(properties);
+ }
+
+ @Override
+ public String getProperty(String property) {
+ return properties.get(property);
+ }
+
+}