You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "timeabarna (via GitHub)" <gi...@apache.org> on 2023/03/28 07:37:31 UTC

[GitHub] [nifi] timeabarna opened a new pull request, #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

timeabarna opened a new pull request, #7092:
URL: https://github.com/apache/nifi/pull/7092

   # Summary
   
   [NIFI-11327](https://issues.apache.org/jira/browse/NIFI-11327)
   
   # Tracking
   
   Please complete the following tracking steps prior to pull request creation.
   
   ### Issue Tracking
   
   - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created
   
   ### Pull Request Tracking
   
   - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-00000`
   - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-00000`
   
   ### Pull Request Formatting
   
   - [ ] Pull Request based on current revision of the `main` branch
   - [ ] Pull Request refers to a feature branch with one commit containing changes
   
   # Verification
   
   Please indicate the verification steps performed prior to pull request creation.
   
   ### Build
   
   - [ ] Build completed using `mvn clean install -P contrib-check`
     - [ ] JDK 11
     - [ ] JDK 17
   
   ### Licensing
   
   - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html)
   - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files
   
   ### Documentation
   
   - [ ] Documentation formatting appears as expected in rendered files
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bbende commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "bbende (via GitHub)" <gi...@apache.org>.
bbende commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1169121670


##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java:
##########
@@ -295,6 +295,48 @@ public Response createFlowVersion(
         return Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/preserveSourceProperties")

Review Comment:
   I think David's suggestion was that we should add an optional query param to the existing create method, so we wouldn't need this whole new method and instead a client could specify `?preserveSourceProperties=true` 
   
   @exceptionfactory is that what you were thinking?



##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketResource.java:
##########
@@ -92,6 +92,32 @@ public Response createBucket(
         return Response.status(Response.Status.OK).entity(createdBucket).build();
     }
 
+    @POST
+    @Path("migrate")

Review Comment:
   Same as previous comment about using a query param on the existing create bucket and being consistent with calling it `preserveSourceProperties`



##########
nifi-registry/nifi-registry-core/nifi-registry-client/src/main/java/org/apache/nifi/registry/client/BucketClient.java:
##########
@@ -36,6 +36,16 @@ public interface BucketClient {
      */
     Bucket create(Bucket bucket) throws NiFiRegistryException, IOException;
 
+    /**
+     * Creates the given bucket. If migration parameter is true the bucket will be created with the provided identifier.
+     * False will set a generated identifier.
+     *
+     * @param bucket the bucket to create
+     * @param migration whether the operation is migration related
+     * @return the created bucket with the populated identifier
+     */
+    Bucket create(Bucket bucket, boolean migration) throws NiFiRegistryException, IOException;

Review Comment:
   Should call this `preserveSourceProperties` to be consistent



##########
nifi-registry/nifi-registry-core/nifi-registry-provider-api/src/main/java/org/apache/nifi/registry/hook/EventType.java:
##########
@@ -41,6 +41,14 @@ public enum EventType {
             EventFieldName.VERSION,
             EventFieldName.USER,
             EventFieldName.COMMENT),
+
+    MIGRATE_FLOW_VERSION(

Review Comment:
   See later comments about query param approach... If we use the query param approach on the existing end-point then I think this probably goes away



##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/service/ServiceFacade.java:
##########
@@ -70,6 +70,8 @@ public interface ServiceFacade {
 
     Bucket deleteBucket(String bucketIdentifier, RevisionInfo revisionInfo);
 
+    Bucket migrateBucket(Bucket bucket);

Review Comment:
   This could go away, or could become `createBucket(Bucket bucket, boolean preserveSourceProperties)`



##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/service/StandardServiceFacade.java:
##########
@@ -202,6 +202,22 @@ public Bucket deleteBucket(final String bucketIdentifier, final RevisionInfo rev
                 () -> registryService.deleteBucket(bucketIdentifier));
     }
 
+    @Override
+    public Bucket migrateBucket(final Bucket bucket) {

Review Comment:
   I think this method can go away right? It would just use the regular create



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String FILE_NAME_SEPARATOR = "_";
+    private static final String STORAGE_LOCATION_URL = "%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+    private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP_EXISTING.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP_EXISTING));
+        final boolean isInteractive = getContext().isInteractive();
+
+        //Gather all buckets and create a map for easier search by bucket name
+        final Map<String, String> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for easier search by flow name.
+        // As flow name is only unique within the same bucket we need to use the bucket id in the key as well
+        final Map<Pair<String, String>, String> flowMap = getFlowMap(client, bucketMap, isInteractive);
+        final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+        // Gather all flow versions and create a map for easier search by flow id
+        final Map<String, List<Integer>> versionMap = getVersionMap(client, flowMap, isInteractive);
+
+        // Create file path list
+        final List<VersionFileMetaData> files = getFilePathList(properties);
+
+        // As we need to keep the version order the list needs to be sorted
+        files.sort((o1, o2) -> ComparisonChain.start()
+                .compare(o1.getBucketName(), o2.getBucketName())
+                .compare(o1.getFlowName(), o2.getFlowName())
+                .compare(o1.getVersion(), o2.getVersion())
+                .result());
+
+        for (VersionFileMetaData file : files) {
+            final String inputSource = file.getInputSource();
+            final String fileContent = getInputSourceContent(inputSource);
+            final VersionedFlowSnapshot snapshot = MAPPER.readValue(fileContent, VersionedFlowSnapshot.class);
+
+            final String bucketName = snapshot.getBucket().getName();
+            final String bucketDescription = snapshot.getBucket().getDescription();
+            final String flowName = snapshot.getFlow().getName();
+            final String flowDescription = snapshot.getFlow().getDescription();
+            final int flowVersion = snapshot.getSnapshotMetadata().getVersion();
+            // The original bucket and flow ids must be kept otherwise NiFi won't be able to synchronize with the NiFi Registry
+            final String flowId = snapshot.getFlow().getIdentifier();
+            final String bucketId = snapshot.getBucket().getIdentifier();
+
+            // Create bucket if missing
+            if (bucketMap.containsKey(bucketName)) {
+                printMessage(isInteractive, bucketName + SKIPPING_BUCKET_CREATION);
+            } else {
+                //The original bucket id must be kept otherwise NiFi won't be able to synchronize with the NiFi Registry
+                createBucket(client, bucketMap, bucketName, bucketDescription, bucketId);
+            }
+
+            // Create flow if missing
+            if (flowMap.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                if (skip) {
+                    printMessage(isInteractive, flowName + SKIPPING_IMPORT);
+                    continue;
+                } else {
+                    //flowId
+                    printMessage(isInteractive, flowName + SKIPPING_FLOW_CREATION);
+                }
+            } else if (!flowCreated.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                createFlow(client, flowCreated, flowId, flowName, flowDescription, bucketId);
+            }
+
+            // Create missing flow versions
+            if (!versionMap.getOrDefault(flowId, Collections.emptyList()).contains(flowVersion)) {
+                //update storage location
+                final String registryUrl = getRequiredArg(properties, CommandOption.URL);
+
+                updateStorageLocation(snapshot.getFlowContents().getVersionedFlowCoordinates(), registryUrl);
+                for (VersionedProcessGroup processGroup : snapshot.getFlowContents().getProcessGroups()) {
+                    updateStorageLocation(processGroup.getVersionedFlowCoordinates(), registryUrl);
+                }

Review Comment:
   Right now I think this will only handle the root group and first level groups, but there could be a child group under version control at any arbitrary level. 
   ```
   Main PG
     - PG Level 1 (not under VC)
       - PG Level 2 (not under VC) 
         - PG Level 3 (Version Controlled) 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bbende commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "bbende (via GitHub)" <gi...@apache.org>.
bbende commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1157769951


##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ExportAllFlows.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.registry.VersionedFlowSnapshotsResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ExportAllFlows extends AbstractNiFiRegistryCommand<VersionedFlowSnapshotsResult> {
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String EXPORTING_FLOW_VERSIONS = "Exporting flow versions...";
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+    private final ExportFlowVersion exportFlowVersion;
+
+    public ExportAllFlows() {
+        super("export-all-flows", VersionedFlowSnapshotsResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+        this.exportFlowVersion = new ExportFlowVersion();
+    }
+
+    @Override
+    public void doInitialize(final Context context) {
+        addOption(CommandOption.OUTPUT_DIR.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+        exportFlowVersion.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "List all the buckets, for each bucket, list all the flows, for each flow, list all versions and export each version." +
+                "Versions will be saved in the provided target directory.";
+    }
+
+    @Override
+    public VersionedFlowSnapshotsResult doExecute(NiFiRegistryClient client, Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final String outputDirectory = getRequiredArg(properties, CommandOption.OUTPUT_DIR);
+        final boolean isInteractive = getContext().isInteractive();
+
+        // Gather all buckets and create a map for quick access by bucket id
+        final Map<String, Bucket> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for quick access by flow id
+        final Map<String, VersionedFlow> flowMap = getFlowMap(client, bucketMap, isInteractive);
+
+        // Gather all versions for all the flows
+        final List<VersionedFlowSnapshotMetadata> versionedFlowSnapshotMetadataList = getVersionedFlowSnapshotMetadataList(client, flowMap, isInteractive);
+
+        // Prepare flow version exports
+        final List<VersionedFlowSnapshot> versionedFlowSnapshotList = getVersionedFlowSnapshotResults(client, outputDirectory, bucketMap, flowMap, versionedFlowSnapshotMetadataList, isInteractive);

Review Comment:
   I think the way the result classes worked kind of led it to be implemented this way, but I agree with David's concern that we should see if we can write out the snapshots one-by-one. Maybe we can pass some kind iterator/helper object to the result class that allow it to keep calling next() and writing out the result?



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+    private final ImportFlowVersion importFlowVersion;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+        this.importFlowVersion = new ImportFlowVersion();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+        importFlowVersion.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP));
+        final boolean isInteractive = getContext().isInteractive();
+
+        //Gather all buckets and create a map for easier search by bucket name
+        final Map<String, String> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for easier search by flow name.
+        // As flow name is only unique within the same bucket we need to use the bucket id in the key as well
+        final Map<Pair<String, String>, String> flowMap = getFlowMap(client, bucketMap, isInteractive);
+        final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+        // Gather all flow versions and create a map for easier search by flow id
+        final Map<String, List<Integer>> versionMap = getVersionMap(client, flowMap, isInteractive);
+
+        // Create file path list
+        final List<String> files = getFilePathList(properties);
+
+        // Deserialize file content
+        final List<ImportedSnapshot> importedSnapshots = deserializeSnapshots(files);
+
+        // As we need to keep the version order the snapshot list needs to be sorted
+        importedSnapshots.sort((o1, o2) -> ComparisonChain.start()
+                .compare(o1.getSnapshot().getBucket().getName(), o2.getSnapshot().getBucket().getName())
+                .compare(o1.getSnapshot().getFlow().getName(), o2.getSnapshot().getFlow().getName())
+                .compare(o1.getSnapshot().getSnapshotMetadata().getVersion(), o2.getSnapshot().getSnapshotMetadata().getVersion())
+                .result());
+
+        for (final ImportedSnapshot snapshot : importedSnapshots) {
+
+            final String inputSource = snapshot.getInputSource();
+            final String bucketName = snapshot.getSnapshot().getBucket().getName();
+            final String bucketDescription = snapshot.getSnapshot().getBucket().getDescription();
+            final String flowName = snapshot.getSnapshot().getFlow().getName();
+            final String flowDescription = snapshot.getSnapshot().getFlow().getDescription();
+            final int flowVersion = snapshot.getSnapshot().getSnapshotMetadata().getVersion();
+            // The original bucket and flow ids must be kept otherwise NiFi won't be able to synchronize with the NiFi Registry
+            final String bucketId = snapshot.getSnapshot().getBucket().getIdentifier();
+            final String flowId = snapshot.getSnapshot().getFlow().getIdentifier();
+
+            // Create bucket if missing
+            if (bucketMap.containsKey(bucketName)) {
+                printMessage(isInteractive, bucketName + SKIPPING_BUCKET_CREATION);
+            } else {
+                createBucket(client, bucketMap, bucketName, bucketDescription, bucketId);
+            }
+
+            // Create flow if missing
+            if (flowMap.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                if (skip) {
+                    printMessage(isInteractive, flowName + SKIPPING_IMPORT);
+                    continue;
+                } else {
+                    printMessage(isInteractive, flowName + SKIPPING_FLOW_CREATION);
+                }
+            } else if (!flowCreated.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                createFlow(client, flowCreated, flowId, flowName, flowDescription, bucketId);
+            }
+
+            // Create missing flow versions
+            if (!versionMap.getOrDefault(flowId, Collections.emptyList()).contains(flowVersion)) {
+                createFlowVersion(client, inputSource, flowId);

Review Comment:
   What is the expectation for handling nested versioning during a migration?
   
   For example, `Outer PG` under version control, and inside it there is `Child PG` also under version control. In registry, the snapshot of `Outer PG` will have a process group for `Child PG` with coordinates pointing to the registry it came from:
   ```
   "versionedFlowCoordinates" : {
           "storageLocation" : "http://localhost:18080/nifi-registry-api/buckets/876e0978-ffe5-43c5-ab2c-69dd0a8d3c52/flows/412512c1-c95f-4a2a-9df3-8a988e326df7/versions/1",
           "registryUrl" : "http://localhost:18080",
           "bucketId" : "876e0978-ffe5-43c5-ab2c-69dd0a8d3c52",
           "flowId" : "412512c1-c95f-4a2a-9df3-8a988e326df7",
           "version" : 1
         }
   ```
   
   If these commands were being used to replicate/move between two different registries at different locations, then when moving `Outer PG` to the new registry, it would need to update `storageLocation` and `registryUrl` to the location of the new registry. Otherwise when a NiFi instance pulls `Outer PG` from the new registry, it will then come to 'Child PG' and try to pull it from the old registry, or fail because no registry client exists for the URL of the old registry.



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+    private final ImportFlowVersion importFlowVersion;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+        this.importFlowVersion = new ImportFlowVersion();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+        importFlowVersion.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP));
+        final boolean isInteractive = getContext().isInteractive();
+
+        //Gather all buckets and create a map for easier search by bucket name
+        final Map<String, String> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for easier search by flow name.
+        // As flow name is only unique within the same bucket we need to use the bucket id in the key as well
+        final Map<Pair<String, String>, String> flowMap = getFlowMap(client, bucketMap, isInteractive);
+        final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+        // Gather all flow versions and create a map for easier search by flow id
+        final Map<String, List<Integer>> versionMap = getVersionMap(client, flowMap, isInteractive);
+
+        // Create file path list
+        final List<String> files = getFilePathList(properties);
+
+        // Deserialize file content
+        final List<ImportedSnapshot> importedSnapshots = deserializeSnapshots(files);

Review Comment:
   I'm wondering if we can change the filenames that are produced by export to contain the 3 fields below that we are sorting on, then we can sort the `List<String> files` in the same order?
   
   So exported filenames would be:
   ```
   <Bucket-Name>-<Flow-Name>-<Version>.json
   ```
   This would also have the added benefit of being human readable for anyone looking at the exported files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bbende merged pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "bbende (via GitHub)" <gi...@apache.org>.
bbende merged PR #7092:
URL: https://github.com/apache/nifi/pull/7092


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1171178295


##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/registry/VersionedFlowSnapshotsResult.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.result.registry;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.toolkit.cli.api.WritableResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Iterator;
+
+/**
+ * Result for a list of VersionedFlowSnapshots.
+ *
+ * If this result was created with a non-null exportDirectoryName, then the write method will ignore
+ * the passed in PrintStream, and will write the serialized snapshot to the given directory.
+ * The file name will be generated from the flow name and its version.
+ *
+ * If this result was created with a null exportDirectoryName, then the write method will write the
+ * serialized snapshots to the given PrintStream.
+ */
+public class VersionedFlowSnapshotsResult implements WritableResult<Iterator<VersionedFlowSnapshot>> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all";
+    private static final String EXPORT_FILE_NAME = "%s/%s_%s_%s_%d";
+    private static final String SEPARATOR = "_";
+    private static final String REPLACEMENT = "-";
+    private final Iterator<VersionedFlowSnapshot> versionedFlowSnapshots;
+    private final String exportDirectoryName;
+
+    public VersionedFlowSnapshotsResult(final Iterator<VersionedFlowSnapshot> versionedFlowSnapshots, final String exportDirectoryName) {
+        this.versionedFlowSnapshots = versionedFlowSnapshots;
+        this.exportDirectoryName = exportDirectoryName;
+        Validate.notNull(this.versionedFlowSnapshots);
+    }
+
+    @Override
+    public Iterator<VersionedFlowSnapshot> getResult() {
+        return versionedFlowSnapshots;
+    }
+
+    @Override
+    public void write(final PrintStream output) throws IOException {
+        while (versionedFlowSnapshots.hasNext()) {
+            final VersionedFlowSnapshot versionedFlowSnapshot = versionedFlowSnapshots.next();
+            if (exportDirectoryName != null) {
+                final String bucketName = versionedFlowSnapshot.getBucket().getName().replaceAll(SEPARATOR, REPLACEMENT);
+                final String flowName = versionedFlowSnapshot.getFlow().getName().replaceAll(SEPARATOR, REPLACEMENT);
+                final int version = versionedFlowSnapshot.getSnapshotMetadata().getVersion();
+                final String exportFileName = String.format(EXPORT_FILE_NAME, exportDirectoryName, FILE_NAME_PREFIX, bucketName, flowName, version);
+                try (final OutputStream resultOut = Files.newOutputStream(Paths.get(exportFileName))) {
+                    JacksonUtils.write(versionedFlowSnapshot, resultOut);
+                }

Review Comment:
   The stackstrace is strange as it does not mention Paths.get which probably causing this exception. I've added catch and also updated the documentation that the directory must exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1169146473


##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java:
##########
@@ -295,6 +295,48 @@ public Response createFlowVersion(
         return Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/preserveSourceProperties")

Review Comment:
   Yes, that's what I had in mind, not a separate path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bbende commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "bbende (via GitHub)" <gi...@apache.org>.
bbende commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1157368613


##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java:
##########
@@ -295,6 +295,48 @@ public Response createFlowVersion(
         return Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/migrate")

Review Comment:
   We had discussed this offline and landed on the `/migrate` approach to clearly separate the usage of the API. The idea was that no one should really be calling the `/migrate` end-point, except for a specific utility that wants to move data between two registries. 
   
   However, I agree that it generally does not follow REST standards, so I'm not that opposed to using a query parameter on the existing end-points. If we were to go with a query parameter then it should probably be `preserveSourceProperties` because it is more general than just the `author`. 
   
   We have 3 different entities that would be migrated:
   
   - `Buckets` - need to preserve id, rather than generating new
   - `Flows` - need to preserve id, rather than generating new
   - `Flow Versions` - need to preserve id and author, rather than generating new and using authenticated user
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1171164642


##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String FILE_NAME_SEPARATOR = "_";
+    private static final String STORAGE_LOCATION_URL = "%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+    private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP_EXISTING.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP_EXISTING));

Review Comment:
   Is there a situation in one of the uses cases where you would want to use --skipExisting true and not get v3 imported?
   The only scenario comes to my mind when there are 2 buckets in Registry A and only one of this buckets can be found in Registry B. There is no continuous synchronisation needed however creating the missing bucket with all its flows and versions can be done without changing anything in the existing bucket.
   
   You are right though this does not fit into the provided two use cases.
   
   I am fine having --skipExisting with noArg for now however I would keep the option for possible future enhancements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on PR #7092:
URL: https://github.com/apache/nifi/pull/7092#issuecomment-1512843623

   Thanks @bbende and @exceptionfactory for your help, I've updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1155979058


##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java:
##########
@@ -295,6 +295,48 @@ public Response createFlowVersion(
         return Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/migrate")

Review Comment:
   Having a verb `migrate` in the path does not follow general REST API conventions, where the path should be a noun and the HTTP method is the verb. One option could be to make `migrate` and `Boolean` query parameter on the `/versions` resource method. However, the word `migrate` itself does not clearly convey what is happening, so perhaps another parameter name would be better. As the method name mentions keeping the author, it seems like a better approach would be to adding a query parameter named something like `preserveAuthor`. If there are other properties to be preserved, perhaps it could be named `preserveSourceProperties`.



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java:
##########
@@ -164,7 +164,9 @@ public enum CommandOption {
     OUTPUT_TYPE("ot", "outputType", "The type of output to produce (json or simple)", true),
     VERBOSE("verbose", "verbose", "Indicates that verbose output should be provided", false),
     RECURSIVE("r", "recursive", "Indicates the command should perform the action recursively", false),
-    HELP("h", "help", "Help", false)
+    HELP("h", "help", "Help", false),
+    SKIP("skip", "skip", "Indicates to skip an operation if target object exists", true),

Review Comment:
   The word `skip` does not clearly convey the intent of this parameter. Perhaps `skip-existing` could work, or some other combination of words that makes it clear what should be skipped.



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/CommandOption.java:
##########
@@ -164,7 +164,9 @@ public enum CommandOption {
     OUTPUT_TYPE("ot", "outputType", "The type of output to produce (json or simple)", true),
     VERBOSE("verbose", "verbose", "Indicates that verbose output should be provided", false),
     RECURSIVE("r", "recursive", "Indicates the command should perform the action recursively", false),
-    HELP("h", "help", "Help", false)
+    HELP("h", "help", "Help", false),
+    SKIP("skip", "skip", "Indicates to skip an operation if target object exists", true),
+    KEEP("keep", "keep", "Indicates to keep the original format or type of an object", true)

Review Comment:
   Similar to `skip`, the word `keep` by itself does not indicate the meaning. Perhaps `preserve-format` or `preserve-type` could be better.



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ExportAllFlows.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.registry.VersionedFlowSnapshotsResult;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public class ExportAllFlows extends AbstractNiFiRegistryCommand<VersionedFlowSnapshotsResult> {
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String EXPORTING_FLOW_VERSIONS = "Exporting flow versions...";
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+    private final ExportFlowVersion exportFlowVersion;
+
+    public ExportAllFlows() {
+        super("export-all-flows", VersionedFlowSnapshotsResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+        this.exportFlowVersion = new ExportFlowVersion();
+    }
+
+    @Override
+    public void doInitialize(final Context context) {
+        addOption(CommandOption.OUTPUT_DIR.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+        exportFlowVersion.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "List all the buckets, for each bucket, list all the flows, for each flow, list all versions and export each version." +
+                "Versions will be saved in the provided target directory.";
+    }
+
+    @Override
+    public VersionedFlowSnapshotsResult doExecute(NiFiRegistryClient client, Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final String outputDirectory = getRequiredArg(properties, CommandOption.OUTPUT_DIR);
+        final boolean isInteractive = getContext().isInteractive();
+
+        // Gather all buckets and create a map for quick access by bucket id
+        final Map<String, Bucket> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for quick access by flow id
+        final Map<String, VersionedFlow> flowMap = getFlowMap(client, bucketMap, isInteractive);
+
+        // Gather all versions for all the flows
+        final List<VersionedFlowSnapshotMetadata> versionedFlowSnapshotMetadataList = getVersionedFlowSnapshotMetadataList(client, flowMap, isInteractive);
+
+        // Prepare flow version exports
+        final List<VersionedFlowSnapshot> versionedFlowSnapshotList = getVersionedFlowSnapshotResults(client, outputDirectory, bucketMap, flowMap, versionedFlowSnapshotMetadataList, isInteractive);

Review Comment:
   Are there potential memory concerns with this approach? What happens if there are hundreds of large flows?



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+    private final ImportFlowVersion importFlowVersion;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+        this.importFlowVersion = new ImportFlowVersion();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+        importFlowVersion.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP));
+        final boolean isInteractive = getContext().isInteractive();
+
+        //Gather all buckets and create a map for easier search by bucket name
+        final Map<String, String> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for easier search by flow name.
+        // As flow name is only unique within the same bucket we need to use the bucket id in the key as well
+        final Map<Pair<String, String>, String> flowMap = getFlowMap(client, bucketMap, isInteractive);
+        final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+        // Gather all flow versions and create a map for easier search by flow id
+        final Map<String, List<Integer>> versionMap = getVersionMap(client, flowMap, isInteractive);
+
+        // Create file path list
+        final List<String> files = getFilePathList(properties);
+
+        // Deserialize file content
+        final List<ImportedSnapshot> importedSnapshots = deserializeSnapshots(files);
+
+        // As we need to keep the version order the snapshot list needs to be sorted
+        importedSnapshots.sort((o1, o2) -> ComparisonChain.start()
+                .compare(o1.getSnapshot().getBucket().getName(), o2.getSnapshot().getBucket().getName())
+                .compare(o1.getSnapshot().getFlow().getName(), o2.getSnapshot().getFlow().getName())
+                .compare(o1.getSnapshot().getSnapshotMetadata().getVersion(), o2.getSnapshot().getSnapshotMetadata().getVersion())
+                .result());
+
+        for (final ImportedSnapshot snapshot : importedSnapshots) {
+
+            final String inputSource = snapshot.getInputSource();
+            final String bucketName = snapshot.getSnapshot().getBucket().getName();
+            final String bucketDescription = snapshot.getSnapshot().getBucket().getDescription();
+            final String flowName = snapshot.getSnapshot().getFlow().getName();
+            final String flowDescription = snapshot.getSnapshot().getFlow().getDescription();
+            final int flowVersion = snapshot.getSnapshot().getSnapshotMetadata().getVersion();
+            // The original bucket and flow ids must be kept otherwise NiFi won't be able to synchronize with the NiFi Registry
+            final String bucketId = snapshot.getSnapshot().getBucket().getIdentifier();
+            final String flowId = snapshot.getSnapshot().getFlow().getIdentifier();
+
+            // Create bucket if missing
+            if (bucketMap.containsKey(bucketName)) {
+                printMessage(isInteractive, bucketName + SKIPPING_BUCKET_CREATION);
+            } else {
+                createBucket(client, bucketMap, bucketName, bucketDescription, bucketId);
+            }
+
+            // Create flow if missing
+            if (flowMap.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                if (skip) {
+                    printMessage(isInteractive, flowName + SKIPPING_IMPORT);
+                    continue;
+                } else {
+                    printMessage(isInteractive, flowName + SKIPPING_FLOW_CREATION);
+                }
+            } else if (!flowCreated.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                createFlow(client, flowCreated, flowId, flowName, flowDescription, bucketId);
+            }
+
+            // Create missing flow versions
+            if (!versionMap.getOrDefault(flowId, Collections.emptyList()).contains(flowVersion)) {
+                createFlowVersion(client, inputSource, flowId);
+            }
+        }
+        return new StringResult(IMPORT_COMPLETED, getContext().isInteractive());
+    }
+
+    private Map<String, String> getBucketMap(final NiFiRegistryClient client, final boolean isInteractive) throws IOException, NiFiRegistryException {
+        printMessage(isInteractive, ALL_BUCKETS_COLLECTED);
+
+        return listBuckets.doExecute(client, new Properties())
+                .getResult()
+                .stream()
+                .collect(Collectors.toMap(Bucket::getName, Bucket::getIdentifier));
+    }
+
+    private Map<Pair<String, String>, String> getFlowMap(final NiFiRegistryClient client, final Map<String, String> bucketMap,
+                                                         final boolean isInteractive) throws ParseException, IOException, NiFiRegistryException {
+        printMessage(isInteractive, ALL_FLOWS_COLLECTED);
+        return getVersionedFlows(client, bucketMap)
+                .stream()
+                .collect(Collectors.toMap(e -> new ImmutablePair<>(e.getBucketIdentifier(), e.getName()),
+                        VersionedFlow::getIdentifier));
+    }
+
+    private List<VersionedFlow> getVersionedFlows(final NiFiRegistryClient client, final Map<String, String> bucketMap) throws ParseException, IOException, NiFiRegistryException {
+        final List<VersionedFlow> flows = new ArrayList<>();
+        for (final String id : bucketMap.values()) {
+            final Properties flowProperties = new Properties();
+            flowProperties.setProperty(CommandOption.BUCKET_ID.getLongName(), id);
+
+            flows.addAll(listFlows.doExecute(client, flowProperties).getResult());
+        }
+        return flows;
+    }
+
+    private Map<String, List<Integer>> getVersionMap(final NiFiRegistryClient client, final Map<Pair<String, String>, String> flowMap,
+                                                     final  boolean isInteractive) throws ParseException, IOException, NiFiRegistryException {
+        printMessage(isInteractive, ALL_FLOW_VERSIONS_COLLECTED);
+        return getVersionedFlowSnapshotMetadataList(client, flowMap)
+                .stream()
+                .collect(Collectors.groupingBy(VersionedFlowSnapshotMetadata::getFlowIdentifier,
+                        Collectors.mapping(VersionedFlowSnapshotMetadata::getVersion, Collectors.toList())));
+    }
+
+    private List<VersionedFlowSnapshotMetadata> getVersionedFlowSnapshotMetadataList(final NiFiRegistryClient client,
+                                                                                     final Map<Pair<String, String>, String> flowMap) throws ParseException, IOException, NiFiRegistryException {
+        final List<VersionedFlowSnapshotMetadata> versions = new ArrayList<>();
+        for (final String flowIds : flowMap.values()) {
+            final Properties flowVersionProperties = new Properties();
+            flowVersionProperties.setProperty(CommandOption.FLOW_ID.getLongName(), flowIds);
+
+            versions.addAll(listFlowVersions.doExecute(client, flowVersionProperties).getResult());
+        }
+        return versions;
+    }
+
+    private List<String> getFilePathList(final Properties properties) throws MissingOptionException, NiFiRegistryException {
+        final String directory = getRequiredArg(properties, CommandOption.INPUT_SOURCE);
+        final List<String> files;
+
+        try (final Stream<Path> paths = Files.list(Paths.get(directory))) {
+            files = paths
+                    .filter(Files::isRegularFile)
+                    .filter(path -> path.getFileName().toString().startsWith(FILE_NAME_PREFIX))
+                    .map(Path::toString)
+                    .collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new NiFiRegistryException("File listing not possible due to ", e);
+        }
+        return files;
+    }
+
+    private List<ImportedSnapshot> deserializeSnapshots(final List<String> files) throws IOException {
+        final List<ImportedSnapshot> importedSnapshots = new ArrayList<>();
+        final ObjectMapper mapper = JacksonUtils.getObjectMapper();

Review Comment:
   `ObjectMapper` is thread safe, so this can be declared as a member variable on the class.



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+    private final ImportFlowVersion importFlowVersion;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+        this.importFlowVersion = new ImportFlowVersion();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+        importFlowVersion.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP));
+        final boolean isInteractive = getContext().isInteractive();
+
+        //Gather all buckets and create a map for easier search by bucket name
+        final Map<String, String> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for easier search by flow name.
+        // As flow name is only unique within the same bucket we need to use the bucket id in the key as well
+        final Map<Pair<String, String>, String> flowMap = getFlowMap(client, bucketMap, isInteractive);
+        final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+        // Gather all flow versions and create a map for easier search by flow id
+        final Map<String, List<Integer>> versionMap = getVersionMap(client, flowMap, isInteractive);
+
+        // Create file path list
+        final List<String> files = getFilePathList(properties);
+
+        // Deserialize file content
+        final List<ImportedSnapshot> importedSnapshots = deserializeSnapshots(files);

Review Comment:
   Instead of serializing all snapshots, is it possible to iterate through each one? It looks like that could create problems for sorting, so perhaps there is no other way to avoid loading everything.



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+    private final ImportFlowVersion importFlowVersion;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+        this.importFlowVersion = new ImportFlowVersion();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+        importFlowVersion.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP));
+        final boolean isInteractive = getContext().isInteractive();
+
+        //Gather all buckets and create a map for easier search by bucket name
+        final Map<String, String> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for easier search by flow name.
+        // As flow name is only unique within the same bucket we need to use the bucket id in the key as well
+        final Map<Pair<String, String>, String> flowMap = getFlowMap(client, bucketMap, isInteractive);
+        final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+        // Gather all flow versions and create a map for easier search by flow id
+        final Map<String, List<Integer>> versionMap = getVersionMap(client, flowMap, isInteractive);
+
+        // Create file path list
+        final List<String> files = getFilePathList(properties);
+
+        // Deserialize file content
+        final List<ImportedSnapshot> importedSnapshots = deserializeSnapshots(files);
+
+        // As we need to keep the version order the snapshot list needs to be sorted
+        importedSnapshots.sort((o1, o2) -> ComparisonChain.start()
+                .compare(o1.getSnapshot().getBucket().getName(), o2.getSnapshot().getBucket().getName())
+                .compare(o1.getSnapshot().getFlow().getName(), o2.getSnapshot().getFlow().getName())
+                .compare(o1.getSnapshot().getSnapshotMetadata().getVersion(), o2.getSnapshot().getSnapshotMetadata().getVersion())
+                .result());
+
+        for (final ImportedSnapshot snapshot : importedSnapshots) {
+
+            final String inputSource = snapshot.getInputSource();
+            final String bucketName = snapshot.getSnapshot().getBucket().getName();
+            final String bucketDescription = snapshot.getSnapshot().getBucket().getDescription();
+            final String flowName = snapshot.getSnapshot().getFlow().getName();
+            final String flowDescription = snapshot.getSnapshot().getFlow().getDescription();
+            final int flowVersion = snapshot.getSnapshot().getSnapshotMetadata().getVersion();
+            // The original bucket and flow ids must be kept otherwise NiFi won't be able to synchronize with the NiFi Registry
+            final String bucketId = snapshot.getSnapshot().getBucket().getIdentifier();
+            final String flowId = snapshot.getSnapshot().getFlow().getIdentifier();
+
+            // Create bucket if missing
+            if (bucketMap.containsKey(bucketName)) {
+                printMessage(isInteractive, bucketName + SKIPPING_BUCKET_CREATION);
+            } else {
+                createBucket(client, bucketMap, bucketName, bucketDescription, bucketId);
+            }
+
+            // Create flow if missing
+            if (flowMap.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                if (skip) {
+                    printMessage(isInteractive, flowName + SKIPPING_IMPORT);
+                    continue;
+                } else {
+                    printMessage(isInteractive, flowName + SKIPPING_FLOW_CREATION);
+                }
+            } else if (!flowCreated.containsKey(new ImmutablePair<>(bucketId, flowName))) {
+                createFlow(client, flowCreated, flowId, flowName, flowDescription, bucketId);
+            }
+
+            // Create missing flow versions
+            if (!versionMap.getOrDefault(flowId, Collections.emptyList()).contains(flowVersion)) {
+                createFlowVersion(client, inputSource, flowId);
+            }
+        }
+        return new StringResult(IMPORT_COMPLETED, getContext().isInteractive());
+    }
+
+    private Map<String, String> getBucketMap(final NiFiRegistryClient client, final boolean isInteractive) throws IOException, NiFiRegistryException {
+        printMessage(isInteractive, ALL_BUCKETS_COLLECTED);
+
+        return listBuckets.doExecute(client, new Properties())
+                .getResult()
+                .stream()
+                .collect(Collectors.toMap(Bucket::getName, Bucket::getIdentifier));
+    }
+
+    private Map<Pair<String, String>, String> getFlowMap(final NiFiRegistryClient client, final Map<String, String> bucketMap,
+                                                         final boolean isInteractive) throws ParseException, IOException, NiFiRegistryException {
+        printMessage(isInteractive, ALL_FLOWS_COLLECTED);
+        return getVersionedFlows(client, bucketMap)
+                .stream()
+                .collect(Collectors.toMap(e -> new ImmutablePair<>(e.getBucketIdentifier(), e.getName()),
+                        VersionedFlow::getIdentifier));
+    }
+
+    private List<VersionedFlow> getVersionedFlows(final NiFiRegistryClient client, final Map<String, String> bucketMap) throws ParseException, IOException, NiFiRegistryException {
+        final List<VersionedFlow> flows = new ArrayList<>();
+        for (final String id : bucketMap.values()) {
+            final Properties flowProperties = new Properties();
+            flowProperties.setProperty(CommandOption.BUCKET_ID.getLongName(), id);
+
+            flows.addAll(listFlows.doExecute(client, flowProperties).getResult());
+        }
+        return flows;
+    }
+
+    private Map<String, List<Integer>> getVersionMap(final NiFiRegistryClient client, final Map<Pair<String, String>, String> flowMap,
+                                                     final  boolean isInteractive) throws ParseException, IOException, NiFiRegistryException {
+        printMessage(isInteractive, ALL_FLOW_VERSIONS_COLLECTED);
+        return getVersionedFlowSnapshotMetadataList(client, flowMap)
+                .stream()
+                .collect(Collectors.groupingBy(VersionedFlowSnapshotMetadata::getFlowIdentifier,
+                        Collectors.mapping(VersionedFlowSnapshotMetadata::getVersion, Collectors.toList())));
+    }
+
+    private List<VersionedFlowSnapshotMetadata> getVersionedFlowSnapshotMetadataList(final NiFiRegistryClient client,
+                                                                                     final Map<Pair<String, String>, String> flowMap) throws ParseException, IOException, NiFiRegistryException {
+        final List<VersionedFlowSnapshotMetadata> versions = new ArrayList<>();
+        for (final String flowIds : flowMap.values()) {
+            final Properties flowVersionProperties = new Properties();
+            flowVersionProperties.setProperty(CommandOption.FLOW_ID.getLongName(), flowIds);
+
+            versions.addAll(listFlowVersions.doExecute(client, flowVersionProperties).getResult());
+        }
+        return versions;
+    }
+
+    private List<String> getFilePathList(final Properties properties) throws MissingOptionException, NiFiRegistryException {
+        final String directory = getRequiredArg(properties, CommandOption.INPUT_SOURCE);
+        final List<String> files;
+
+        try (final Stream<Path> paths = Files.list(Paths.get(directory))) {
+            files = paths
+                    .filter(Files::isRegularFile)
+                    .filter(path -> path.getFileName().toString().startsWith(FILE_NAME_PREFIX))
+                    .map(Path::toString)
+                    .collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new NiFiRegistryException("File listing not possible due to ", e);

Review Comment:
   This approach to error messages needs to be changed. The message should stand by itself, it will not be concatenated with the exception. Recommend the following:
   ```suggestion
               throw new NiFiRegistryException("File listing failed", e);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1171164642


##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String FILE_NAME_SEPARATOR = "_";
+    private static final String STORAGE_LOCATION_URL = "%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+    private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP_EXISTING.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP_EXISTING));

Review Comment:
   Is there a situation in one of the uses cases where you would want to use --skipExisting true and not get v3 imported?
   The only scenario comes to my mind when there are 2 buckets in Registry A and only one of this buckets can be found in Registry B. There is no continuous synchronisation is needed however creating the missing bucket with all its flows and versions can be done without changing anything in the existing bucket.
   
   You are right though this does not fit into the provided two use cases.
   
   I am fine having --skipExisting with noArg for now however I would keep the option for possible future enhancements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] bbende commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "bbende (via GitHub)" <gi...@apache.org>.
bbende commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1170134766


##########
nifi-docs/src/main/asciidoc/toolkit-guide.adoc:
##########
@@ -768,6 +770,325 @@ This command migrates nifi.properties back from AWS_KMS to AES_GCM protection sc
 -v
 ----
 
+[export_import_all_flows]
+=== Export All Flows
+You can use the `export-all-flows` to perform the following tasks:
+
+* List all the buckets
+* For each bucket, list all flows
+* For each flow, list all versions
+* Export each version into a provided directory
+
+Running the command requires an `--outputDirectory` parameter.
+
+=== Import All Flows
+You can use the `import-all-flows` to perform the following tasks:
+
+* List all files, representing a flow version, from a directory created by export-all-flows
+* Create all the corresponding buckets
+* Create all the corresponding flows
+* Import all the corresponding flow versions
+
+Running the command requires 2 parameters:
+
+* `--input` parameter represents a directory to read files from
+* `--skipExisting` parameter, configuring how to handle existing flow and flow version creation.
+If true the flow and flow version creation will be skipped regardless of there are missing flow versions.
+If false the missing flow versions will be created. The default value is true, skip creation.
+
+=== Usage
+The input source for an import-all-flows command must be created by an export-all-flows command.
+To avoid migration conflicts, no modification should be performed in the NiFi Registry during this activity.
+Buckets and flows with the same name are considered equal.
+
+* Export all flow versions:
+
+ ./bin/cli.sh registry export-all-flows -u http://localhost:18080 --outputDirectory "/my_dir/flow_exports"
+
+* Import all flow versions:
+
+ ./bin/cli.sh registry import-all-flows -u http://localhost:18080 --input "/my_dir/flow_exports" --skipExisting false
+
+=== Expected behaviour
+=== Use case 1: reconfiguring an existing NiFi Registry
+
+NiFi is connecting to NiFi Registry, the NiFi Registry does not change, only its configuration.
+All the data will be created.
+
+1. Export versions:
+
+ ./bin/cli.sh registry export-all-flows -u http://localhost:18080 --outputDirectory "/my_dir/flow_exports"
+
+2. Stop registry
+
+3. Switch provider
+
+4. Start registry
+
+5. Import versions
+
+ ./bin/cli.sh registry import-all-flows -u http://localhost:18080 --input "/my_dir/flow_exports" --skipExisting true
+
+
+=== Use case 2: data replication
+
+NiFi_1 is connecting to NiFi Registry_1 and NiFi_2 is connecting to NiFi Registry_2.
+
+For disaster recovery purposes the data from NiFi Registry_1 needs to be periodically replicated to NiFi Registry_2 via a scheduled job.
+
+The initial version of Nifi Registry_2 needs to be created by this tool.
+
+The missing buckets, flows and versions will be created. If bucket and flow exist the missing versions will be created.
+
+1. Export versions:
+
+ ./bin/cli.sh registry export-all-flows -u http://nifi-registry-1:18080 --outputDirectory "/my_dir/flow_exports"
+
+2. Import versions:
+
+ ./bin/cli.sh registry import-all-flows -u http://nifi-registry-2:18080 --input "/my_dir/flow_exports" --skipExisting false
+
+

Review Comment:
   I think this is adding back a bunch of sections that were removed on main. Need to update so that this is only adding your new section, and everything afterwards starting with `File Manager` down to `tls_toolkit` is removed.



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String FILE_NAME_SEPARATOR = "_";
+    private static final String STORAGE_LOCATION_URL = "%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+    private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP_EXISTING.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP_EXISTING));
+        final boolean isInteractive = getContext().isInteractive();
+
+        //Gather all buckets and create a map for easier search by bucket name
+        final Map<String, String> bucketMap = getBucketMap(client, isInteractive);
+
+        // Gather all flows and create a map for easier search by flow name.
+        // As flow name is only unique within the same bucket we need to use the bucket id in the key as well
+        final Map<Pair<String, String>, String> flowMap = getFlowMap(client, bucketMap, isInteractive);
+        final Map<Pair<String, String>, String> flowCreated = new HashMap<>();
+
+        // Gather all flow versions and create a map for easier search by flow id
+        final Map<String, List<Integer>> versionMap = getVersionMap(client, flowMap, isInteractive);
+
+        // Create file path list
+        final List<VersionFileMetaData> files = getFilePathList(properties);
+
+        // As we need to keep the version order the list needs to be sorted
+        files.sort((o1, o2) -> ComparisonChain.start()
+                .compare(o1.getBucketName(), o2.getBucketName())
+                .compare(o1.getFlowName(), o2.getFlowName())
+                .compare(o1.getVersion(), o2.getVersion())
+                .result());
+
+        for (VersionFileMetaData file : files) {
+            final String inputSource = file.getInputSource();
+            final String fileContent = getInputSourceContent(inputSource);
+            final VersionedFlowSnapshot snapshot = MAPPER.readValue(fileContent, VersionedFlowSnapshot.class);
+
+            final String bucketName = snapshot.getBucket().getName();
+            final String bucketDescription = snapshot.getBucket().getDescription();
+            final String flowName = snapshot.getFlow().getName();
+            final String flowDescription = snapshot.getFlow().getDescription();
+            final int flowVersion = snapshot.getSnapshotMetadata().getVersion();
+            // The original bucket and flow ids must be kept otherwise NiFi won't be able to synchronize with the NiFi Registry
+            final String flowId = snapshot.getFlow().getIdentifier();
+            final String bucketId = snapshot.getBucket().getIdentifier();

Review Comment:
   Could we print a message at the beginning and end of this loop like?
   ```
   Importing <flow Name> - <flowVersion> to <bucketName>
   ...
   Successfully imported <flow Name> - <flowVersion> to <bucketName>
   ```
   I think this would help keep track of the current messages to know where it is during the import.
   I guess the successful message would only print if not skipping.
   



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/result/registry/VersionedFlowSnapshotsResult.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.result.registry;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.toolkit.cli.api.WritableResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Iterator;
+
+/**
+ * Result for a list of VersionedFlowSnapshots.
+ *
+ * If this result was created with a non-null exportDirectoryName, then the write method will ignore
+ * the passed in PrintStream, and will write the serialized snapshot to the given directory.
+ * The file name will be generated from the flow name and its version.
+ *
+ * If this result was created with a null exportDirectoryName, then the write method will write the
+ * serialized snapshots to the given PrintStream.
+ */
+public class VersionedFlowSnapshotsResult implements WritableResult<Iterator<VersionedFlowSnapshot>> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all";
+    private static final String EXPORT_FILE_NAME = "%s/%s_%s_%s_%d";
+    private static final String SEPARATOR = "_";
+    private static final String REPLACEMENT = "-";
+    private final Iterator<VersionedFlowSnapshot> versionedFlowSnapshots;
+    private final String exportDirectoryName;
+
+    public VersionedFlowSnapshotsResult(final Iterator<VersionedFlowSnapshot> versionedFlowSnapshots, final String exportDirectoryName) {
+        this.versionedFlowSnapshots = versionedFlowSnapshots;
+        this.exportDirectoryName = exportDirectoryName;
+        Validate.notNull(this.versionedFlowSnapshots);
+    }
+
+    @Override
+    public Iterator<VersionedFlowSnapshot> getResult() {
+        return versionedFlowSnapshots;
+    }
+
+    @Override
+    public void write(final PrintStream output) throws IOException {
+        while (versionedFlowSnapshots.hasNext()) {
+            final VersionedFlowSnapshot versionedFlowSnapshot = versionedFlowSnapshots.next();
+            if (exportDirectoryName != null) {
+                final String bucketName = versionedFlowSnapshot.getBucket().getName().replaceAll(SEPARATOR, REPLACEMENT);
+                final String flowName = versionedFlowSnapshot.getFlow().getName().replaceAll(SEPARATOR, REPLACEMENT);
+                final int version = versionedFlowSnapshot.getSnapshotMetadata().getVersion();
+                final String exportFileName = String.format(EXPORT_FILE_NAME, exportDirectoryName, FILE_NAME_PREFIX, bucketName, flowName, version);
+                try (final OutputStream resultOut = Files.newOutputStream(Paths.get(exportFileName))) {
+                    JacksonUtils.write(versionedFlowSnapshot, resultOut);
+                }

Review Comment:
   If the output directory doesn't exist, currently the export fails, but it isn't very clear why...
   ```
   #> registry export-all-flows --outputDirectory /tmp/registry-export -verbose
   
   All buckets collected...
   
   
   All flows collected...
   
   
   All flow versions collected...
   
   
   ERROR: /tmp/registry-export/toolkit_registry_export_all_B1_Resuable_1
   ```
   
   With adding `-verbose` we can see:
   ```
   java.nio.file.NoSuchFileException: /tmp/registry-export/toolkit_registry_export_all_B1_Resuable_1
   	at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
   	at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
   	at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
   	at java.base/sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:219)
   	at java.base/java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:478)
   	at java.base/java.nio.file.Files.newOutputStream(Files.java:220)
   	at org.apache.nifi.toolkit.cli.impl.result.registry.VersionedFlowSnapshotsResult.write(VersionedFlowSnapshotsResult.java:69)
   	at org.apache.nifi.toolkit.cli.impl.command.CommandProcessor.processCommand(CommandProcessor.java:256)
   	at org.apache.nifi.toolkit.cli.impl.command.CommandProcessor.processGroupCommand(CommandProcessor.java:233)
   	at org.apache.nifi.toolkit.cli.impl.command.CommandProcessor.process(CommandProcessor.java:188)
   	at org.apache.nifi.toolkit.cli.CLIMain.runInteractiveCLI(CLIMain.java:124)
   	at org.apache.nifi.toolkit.cli.CLIMain.main(CLIMain.java:68)
   ```
   
   Match we should add something here like:
   ```
   catch(Exception e) {
     throw new RuntimeException("Unable to write flow snapshot to: " + exportFileName, e);
   }
   ```
   ?



##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java:
##########
@@ -279,15 +279,21 @@ public Response createFlowVersion(
             @ApiParam(value = "The flow identifier")
                 final String flowId,
             @ApiParam(value = "The new versioned flow snapshot.", required = true)
-                final VersionedFlowSnapshot snapshot) {
+                final VersionedFlowSnapshot snapshot,
+            @ApiParam(
+                value = "Whether source properties like author should be kept")
+            @QueryParam("preserveSourceProperties")
+            final boolean preserveSourceProperties) {
 
         verifyPathParamsMatchBody(bucketId, flowId, snapshot);
 
         // bucketId and flowId fields are optional in the body parameter, but required before calling the service layer
         setSnaphotMetadataIfMissing(bucketId, flowId, snapshot);
 
-        final String userIdentity = NiFiUserUtils.getNiFiUserIdentity();
-        snapshot.getSnapshotMetadata().setAuthor(userIdentity);
+        if (!preserveSourceProperties) {
+            final String userIdentity = NiFiUserUtils.getNiFiUserIdentity();
+            snapshot.getSnapshotMetadata().setAuthor(userIdentity);
+        }

Review Comment:
   If `preserveSourceProperties` is `true` , we should enforce that `snapshot.snapshotMetadata.author` is not blank. It is possible some lower layer already enforces that author can't be blank/null, and if so then that is probably ok, but if not then we should enforce it here by throwing `BadRequestException`.



##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String FILE_NAME_SEPARATOR = "_";
+    private static final String STORAGE_LOCATION_URL = "%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+    private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP_EXISTING.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP_EXISTING));

Review Comment:
   It seems like `SKIP_EXISTING` really only relates to skipping the import of versions for a flow when the flow already exists. So say my registry already has `Flow 1` with `v1` and `v2` and now I am trying to do an import that has the same flow, but has one additional version of `v3`. If I specify `--skipExisting true` then `v3` does not get imported, and if specify `--skipExisting false` then it does get imported.
   
   Is there a situation in one of the uses cases where you would want to use `--skipExisting true` and not get `v3` imported?
   
   If we do keep `--skipExisting`, I am thinking it might be clearer if the way it was used was to make it a non-required argument, and if it is not specified then the behavior is as if it was `--skipExisting false`. If they do specify it, then it can be an argument with no value and specifying `--skipExisting` means `--skipExisting true`.
   
   Default usage would just be:
   ```
   registry import-all-flows -i /path/to/export
   ```
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1171164642


##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String FILE_NAME_SEPARATOR = "_";
+    private static final String STORAGE_LOCATION_URL = "%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+    private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP_EXISTING.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP_EXISTING));

Review Comment:
   Is there a situation in one of the uses cases where you would want to use --skipExisting true and not get v3 imported?
   The only scenario comes to my mind when there are 2 buckets in Registry A and only one of this buckets can be found in Registry B. There no continuous synchronisation is needed however creating the missing bucket with all its flows and versions can be done without changing anything in the existing bucket.
   
   You are right though this does not fit into the provided two use cases.
   
   I am fine having --skipExisting with noArg for now however I would keep the option for possible future enhancements.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1171164642


##########
nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/registry/flow/ImportAllFlows.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.toolkit.cli.impl.command.registry.flow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.curator.shaded.com.google.common.collect.ComparisonChain;
+import org.apache.nifi.flow.VersionedFlowCoordinates;
+import org.apache.nifi.flow.VersionedProcessGroup;
+import org.apache.nifi.registry.bucket.Bucket;
+import org.apache.nifi.registry.client.BucketClient;
+import org.apache.nifi.registry.client.FlowClient;
+import org.apache.nifi.registry.client.FlowSnapshotClient;
+import org.apache.nifi.registry.client.NiFiRegistryClient;
+import org.apache.nifi.registry.client.NiFiRegistryException;
+import org.apache.nifi.registry.flow.VersionedFlow;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
+import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
+import org.apache.nifi.toolkit.cli.api.CommandException;
+import org.apache.nifi.toolkit.cli.api.Context;
+import org.apache.nifi.toolkit.cli.impl.command.CommandOption;
+import org.apache.nifi.toolkit.cli.impl.command.registry.AbstractNiFiRegistryCommand;
+import org.apache.nifi.toolkit.cli.impl.command.registry.bucket.ListBuckets;
+import org.apache.nifi.toolkit.cli.impl.result.StringResult;
+import org.apache.nifi.toolkit.cli.impl.util.JacksonUtils;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ImportAllFlows extends AbstractNiFiRegistryCommand<StringResult> {
+    private static final String FILE_NAME_PREFIX = "toolkit_registry_export_all_";
+    private static final String SKIPPING_BUCKET_CREATION = " already exists, skipping bucket creation...";
+    private static final String SKIPPING_IMPORT = " already exists, skipping import...";
+    private static final String SKIPPING_FLOW_CREATION = " already exists, skipping flow creation...";
+    private static final String IMPORT_COMPLETED = "Import completed...";
+    private static final String ALL_BUCKETS_COLLECTED = "All buckets collected...";
+    private static final String ALL_FLOWS_COLLECTED = "All flows collected...";
+    private static final String ALL_FLOW_VERSIONS_COLLECTED = "All flow versions collected...";
+    private static final String FILE_NAME_SEPARATOR = "_";
+    private static final String STORAGE_LOCATION_URL = "%s/nifi-registry-api/buckets/%s/flows/%s/versions/%s";
+    private static final ObjectMapper MAPPER = JacksonUtils.getObjectMapper();
+    private final ListBuckets listBuckets;
+    private final ListFlows listFlows;
+    private final ListFlowVersions listFlowVersions;
+
+    public ImportAllFlows() {
+        super("import-all-flows", StringResult.class);
+        this.listBuckets = new ListBuckets();
+        this.listFlows = new ListFlows();
+        this.listFlowVersions = new ListFlowVersions();
+    }
+
+    @Override
+    protected void doInitialize(Context context) {
+        addOption(CommandOption.INPUT_SOURCE.createOption());
+        addOption(CommandOption.SKIP_EXISTING.createOption());
+
+        listBuckets.initialize(context);
+        listFlows.initialize(context);
+        listFlowVersions.initialize(context);
+    }
+
+    @Override
+    public String getDescription() {
+        return "From a provided directory as input, the directory content must be generated by the export-all-flows command, " +
+                "based on the file contents, the corresponding buckets, flows and flow versions will be created." +
+                "If not configured otherwise, already existing objects will be skipped.";
+    }
+
+    @Override
+    public StringResult doExecute(final NiFiRegistryClient client, final Properties properties) throws IOException, NiFiRegistryException, ParseException, CommandException {
+        final boolean skip = Boolean.parseBoolean(getRequiredArg(properties, CommandOption.SKIP_EXISTING));

Review Comment:
   Is there a situation in one of the uses cases where you would want to use --skipExisting true and not get v3 imported?
   The only scenario comes to my mind when there are 2 buckets in Registry A and only one of this buckets can be found in Registry B. There is no continuous synchronisation is needed however creating the missing bucket with all its flows and versions can be done without changing anything in the existing bucket.
   
   You are right though this does not fit into the provided two use cases.
   
   I am fine having --skipExisting with noArg for now however I would keep the option for future enhancement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1171168786


##########
nifi-docs/src/main/asciidoc/toolkit-guide.adoc:
##########
@@ -768,6 +770,325 @@ This command migrates nifi.properties back from AWS_KMS to AES_GCM protection sc
 -v
 ----
 
+[export_import_all_flows]
+=== Export All Flows
+You can use the `export-all-flows` to perform the following tasks:
+
+* List all the buckets
+* For each bucket, list all flows
+* For each flow, list all versions
+* Export each version into a provided directory
+
+Running the command requires an `--outputDirectory` parameter.
+
+=== Import All Flows
+You can use the `import-all-flows` to perform the following tasks:
+
+* List all files, representing a flow version, from a directory created by export-all-flows
+* Create all the corresponding buckets
+* Create all the corresponding flows
+* Import all the corresponding flow versions
+
+Running the command requires 2 parameters:
+
+* `--input` parameter represents a directory to read files from
+* `--skipExisting` parameter, configuring how to handle existing flow and flow version creation.
+If true the flow and flow version creation will be skipped regardless of there are missing flow versions.
+If false the missing flow versions will be created. The default value is true, skip creation.
+
+=== Usage
+The input source for an import-all-flows command must be created by an export-all-flows command.
+To avoid migration conflicts, no modification should be performed in the NiFi Registry during this activity.
+Buckets and flows with the same name are considered equal.
+
+* Export all flow versions:
+
+ ./bin/cli.sh registry export-all-flows -u http://localhost:18080 --outputDirectory "/my_dir/flow_exports"
+
+* Import all flow versions:
+
+ ./bin/cli.sh registry import-all-flows -u http://localhost:18080 --input "/my_dir/flow_exports" --skipExisting false
+
+=== Expected behaviour
+=== Use case 1: reconfiguring an existing NiFi Registry
+
+NiFi is connecting to NiFi Registry, the NiFi Registry does not change, only its configuration.
+All the data will be created.
+
+1. Export versions:
+
+ ./bin/cli.sh registry export-all-flows -u http://localhost:18080 --outputDirectory "/my_dir/flow_exports"
+
+2. Stop registry
+
+3. Switch provider
+
+4. Start registry
+
+5. Import versions
+
+ ./bin/cli.sh registry import-all-flows -u http://localhost:18080 --input "/my_dir/flow_exports" --skipExisting true
+
+
+=== Use case 2: data replication
+
+NiFi_1 is connecting to NiFi Registry_1 and NiFi_2 is connecting to NiFi Registry_2.
+
+For disaster recovery purposes the data from NiFi Registry_1 needs to be periodically replicated to NiFi Registry_2 via a scheduled job.
+
+The initial version of Nifi Registry_2 needs to be created by this tool.
+
+The missing buckets, flows and versions will be created. If bucket and flow exist the missing versions will be created.
+
+1. Export versions:
+
+ ./bin/cli.sh registry export-all-flows -u http://nifi-registry-1:18080 --outputDirectory "/my_dir/flow_exports"
+
+2. Import versions:
+
+ ./bin/cli.sh registry import-all-flows -u http://nifi-registry-2:18080 --input "/my_dir/flow_exports" --skipExisting false
+
+

Review Comment:
   Thanks for catching this, incorrectly resolved merge conflict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on PR #7092:
URL: https://github.com/apache/nifi/pull/7092#issuecomment-1486367007

   Hello @bbende,
   
   Can you please review this PR?
   
   Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] timeabarna commented on pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "timeabarna (via GitHub)" <gi...@apache.org>.
timeabarna commented on PR #7092:
URL: https://github.com/apache/nifi/pull/7092#issuecomment-1508199943

   Hello @bbende and @exceptionfactory,
   
   Thanks you very much for your review. I've updated the code based on your recommendations, improved serialisation and added storage location update.
   
   Can you please check again?
   
   Thanks in advance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7092: NIFI-11327 Add Export/Import All - NiFi CLI - NiFi Registry

Posted by "exceptionfactory (via GitHub)" <gi...@apache.org>.
exceptionfactory commented on code in PR #7092:
URL: https://github.com/apache/nifi/pull/7092#discussion_r1157392825


##########
nifi-registry/nifi-registry-core/nifi-registry-web-api/src/main/java/org/apache/nifi/registry/web/api/BucketFlowResource.java:
##########
@@ -295,6 +295,48 @@ public Response createFlowVersion(
         return Response.status(Response.Status.OK).entity(createdSnapshot).build();
     }
 
+    @POST
+    @Path("{flowId}/versions/migrate")

Review Comment:
   Thanks for the reply and background @bbende.
   
   Unfortunately JAX-RS does not appear to support method-based matching using Request query parameters, but it seems like introducing a `preserveSourceProperties` parameter in the existing method would provide a clear separation of behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org