You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2022/05/20 19:32:37 UTC

[nifi] 05/06: NIFI-9967: Added FlowSerializationStrategy to determine which flow formats (XML,JSON) to save

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 1bcbc746ca6831a755ea34e3d47b417862050070
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Wed Apr 27 13:37:31 2022 -0400

    NIFI-9967: Added FlowSerializationStrategy to determine which flow formats (XML,JSON) to save
    
    This closes #6001.
    
    Signed-off-by: Kevin Doran <kd...@apache.org>
---
 .../nifi/controller/FlowSerializationStrategy.java | 31 ++++++++++++++++++++++
 .../nifi/controller/StandardFlowService.java       | 14 ++++++----
 .../persistence/StandardFlowConfigurationDAO.java  | 13 ++++++---
 .../spring/StandardFlowServiceFactoryBean.java     |  4 ++-
 .../nifi/controller/StandardFlowServiceTest.java   |  3 ++-
 .../nifi/integration/FrameworkIntegrationTest.java |  3 ++-
 .../apache/nifi/headless/HeadlessNiFiServer.java   |  4 ++-
 7 files changed, 60 insertions(+), 12 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializationStrategy.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializationStrategy.java
new file mode 100644
index 0000000000..d5e0f162e0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowSerializationStrategy.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller;
+
+public enum FlowSerializationStrategy {
+    WRITE_XML_ONLY,
+    WRITE_JSON_ONLY,
+    WRITE_XML_AND_JSON;
+
+    public boolean writesJson() {
+        return this == WRITE_JSON_ONLY || this == WRITE_XML_AND_JSON;
+    }
+
+    public boolean writesXml() {
+        return this == WRITE_XML_ONLY || this == WRITE_XML_AND_JSON;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 6d18032e03..c3eb799f5e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -159,9 +159,11 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             final NiFiProperties nifiProperties,
             final PropertyEncryptor encryptor,
             final RevisionManager revisionManager,
-            final Authorizer authorizer) throws IOException {
+            final Authorizer authorizer,
+            final FlowSerializationStrategy serializationStrategy) throws IOException {
 
-        return new StandardFlowService(controller, nifiProperties, null, encryptor, false, null, revisionManager, authorizer);
+        return new StandardFlowService(controller, nifiProperties, null, encryptor, false, null, revisionManager, authorizer,
+                serializationStrategy);
     }
 
     public static StandardFlowService createClusteredInstance(
@@ -173,7 +175,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             final RevisionManager revisionManager,
             final Authorizer authorizer) throws IOException {
 
-        return new StandardFlowService(controller, nifiProperties, senderListener, encryptor, true, coordinator, revisionManager, authorizer);
+        return new StandardFlowService(controller, nifiProperties, senderListener, encryptor, true, coordinator, revisionManager, authorizer,
+                FlowSerializationStrategy.WRITE_XML_AND_JSON);
     }
 
     private StandardFlowService(
@@ -184,7 +187,8 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
             final boolean configuredForClustering,
             final ClusterCoordinator clusterCoordinator,
             final RevisionManager revisionManager,
-            final Authorizer authorizer) throws IOException {
+            final Authorizer authorizer,
+            final FlowSerializationStrategy serializationStrategy) throws IOException {
 
         this.nifiProperties = nifiProperties;
         this.controller = controller;
@@ -192,7 +196,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
         gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
         autoResumeState = nifiProperties.getAutoResumeState();
 
-        dao = new StandardFlowConfigurationDAO(encryptor, nifiProperties, controller.getExtensionManager());
+        dao = new StandardFlowConfigurationDAO(encryptor, nifiProperties, controller.getExtensionManager(), serializationStrategy);
         this.clusterCoordinator = clusterCoordinator;
         if (clusterCoordinator != null) {
             clusterCoordinator.setFlowService(this);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardFlowConfigurationDAO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardFlowConfigurationDAO.java
index 97b2c45457..db3cdb8339 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardFlowConfigurationDAO.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/persistence/StandardFlowConfigurationDAO.java
@@ -18,6 +18,7 @@ package org.apache.nifi.persistence;
 
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.FlowSerializationStrategy;
 import org.apache.nifi.controller.MissingBundleException;
 import org.apache.nifi.controller.UninheritableFlowException;
 import org.apache.nifi.controller.XmlFlowSynchronizer;
@@ -62,11 +63,13 @@ public final class StandardFlowConfigurationDAO implements FlowConfigurationDAO
 
     private volatile boolean jsonFileExists;
     private final String clusterFlowSerializationFormat;
+    private final FlowSerializationStrategy serializationStrategy;
 
     public StandardFlowConfigurationDAO(final PropertyEncryptor encryptor, final NiFiProperties nifiProperties,
-                                        final ExtensionManager extensionManager) throws IOException {
+                                        final ExtensionManager extensionManager, final FlowSerializationStrategy serializationStrategy) throws IOException {
         this.nifiProperties = nifiProperties;
         this.clusterFlowSerializationFormat = nifiProperties.getProperty(CLUSTER_FLOW_SERIALIZATION_FORMAT);
+        this.serializationStrategy = serializationStrategy;
 
         xmlFile = nifiProperties.getFlowConfigurationFile();
         jsonFile = nifiProperties.getFlowConfigurationJsonFile();
@@ -184,8 +187,12 @@ public final class StandardFlowConfigurationDAO implements FlowConfigurationDAO
             throw new NullPointerException();
         }
 
-        saveJson(controller, archive);
-        saveXml(controller, archive);
+        if (serializationStrategy.writesJson()) {
+            saveJson(controller, archive);
+        }
+        if (serializationStrategy.writesXml()) {
+            saveXml(controller, archive);
+        }
     }
 
     private void saveJson(final FlowController controller, final boolean archive) throws IOException {
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StandardFlowServiceFactoryBean.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StandardFlowServiceFactoryBean.java
index 71db775bc6..13e70e4cd8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StandardFlowServiceFactoryBean.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/spring/StandardFlowServiceFactoryBean.java
@@ -20,6 +20,7 @@ import org.apache.nifi.authorization.Authorizer;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.FlowSerializationStrategy;
 import org.apache.nifi.controller.StandardFlowService;
 import org.apache.nifi.encrypt.PropertyEncryptor;
 import org.apache.nifi.services.FlowService;
@@ -65,7 +66,8 @@ public class StandardFlowServiceFactoryBean implements FactoryBean, ApplicationC
                     properties,
                     encryptor,
                     revisionManager,
-                    authorizer);
+                    authorizer,
+                    FlowSerializationStrategy.WRITE_XML_AND_JSON);
             }
         }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
index 3abe468418..98e68378f7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/StandardFlowServiceTest.java
@@ -94,7 +94,8 @@ public class StandardFlowServiceTest {
         extensionManager = mock(ExtensionDiscoveringManager.class);
         flowController = FlowController.createStandaloneInstance(mockFlowFileEventRepository, properties, authorizer, mockAuditService, mockEncryptor,
                                         new VolatileBulletinRepository(), variableRegistry, mock(FlowRegistryClient.class), extensionManager, statusHistoryRepository);
-        flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager, authorizer);
+        flowService = StandardFlowService.createStandaloneInstance(flowController, properties, mockEncryptor, revisionManager, authorizer,
+                FlowSerializationStrategy.WRITE_XML_AND_JSON);
         statusHistoryRepository = mock(StatusHistoryRepository.class);
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
index eeced0524e..ace4a6fae4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/FrameworkIntegrationTest.java
@@ -35,6 +35,7 @@ import org.apache.nifi.connectable.StandardConnection;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.FileSystemSwapManager;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.FlowSerializationStrategy;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.StandardSnippet;
 import org.apache.nifi.controller.XmlFlowSynchronizer;
@@ -334,7 +335,7 @@ public class FrameworkIntegrationTest {
         logger.info("Shutting down for restart....");
 
         // Save Flow to a byte array
-        final FlowConfigurationDAO flowDao = new StandardFlowConfigurationDAO(flowController.getEncryptor(), nifiProperties, getExtensionManager());
+        final FlowConfigurationDAO flowDao = new StandardFlowConfigurationDAO(flowController.getEncryptor(), nifiProperties, getExtensionManager(), FlowSerializationStrategy.WRITE_XML_AND_JSON);
         final ByteArrayOutputStream baos = new ByteArrayOutputStream();
         flowDao.save(flowController, baos);
         final byte[] flowBytes = baos.toByteArray();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
index c862b94a11..9ba5fbdc71 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-headless-server/src/main/java/org/apache/nifi/headless/HeadlessNiFiServer.java
@@ -30,6 +30,7 @@ import org.apache.nifi.authorization.exception.AuthorizerDestructionException;
 import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.controller.DecommissionTask;
 import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.FlowSerializationStrategy;
 import org.apache.nifi.controller.StandardFlowService;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
@@ -155,7 +156,8 @@ public class HeadlessNiFiServer implements NiFiServer {
                     props,
                     encryptor,
                     null, // revision manager
-                    authorizer);
+                    authorizer,
+                    FlowSerializationStrategy.WRITE_XML_ONLY);
 
             diagnosticsFactory = new BootstrapDiagnosticsFactory();
             ((BootstrapDiagnosticsFactory) diagnosticsFactory).setFlowController(flowController);