You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:57 UTC
[22/62] [abbrv] incubator-nifi git commit: Squashed commit of the
following:
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskTypesEntity.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskTypesEntity.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskTypesEntity.java
new file mode 100644
index 0000000..4b021ef
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTaskTypesEntity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import java.util.Set;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of
+ * a response to the API. This particular entity holds a reference to a list of
+ * reporting task types.
+ */
+@XmlRootElement(name = "reportingTaskTypesEntity")
+public class ReportingTaskTypesEntity extends Entity {
+
+ private Set<DocumentedTypeDTO> reportingTaskTypes;
+
+ /**
+ * The list of reporting task types that are being serialized.
+ *
+ * @return
+ */
+ public Set<DocumentedTypeDTO> getReportingTaskTypes() {
+ return reportingTaskTypes;
+ }
+
+ public void setReportingTaskTypes(Set<DocumentedTypeDTO> reportingTaskTypes) {
+ this.reportingTaskTypes = reportingTaskTypes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTasksEntity.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTasksEntity.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTasksEntity.java
new file mode 100644
index 0000000..4699d5d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ReportingTasksEntity.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import java.util.Set;
+import javax.xml.bind.annotation.XmlRootElement;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of
+ * a response to the API. This particular entity holds a reference to a list of
+ * reporting tasks.
+ */
+@XmlRootElement(name = "reportingTasksEntity")
+public class ReportingTasksEntity extends Entity {
+
+ private Set<ReportingTaskDTO> reportingTasks;
+
+ /**
+ * The list of reporting tasks that are being serialized.
+ *
+ * @return
+ */
+ public Set<ReportingTaskDTO> getReportingTasks() {
+ return reportingTasks;
+ }
+
+ public void setReportingTasks(Set<ReportingTaskDTO> reportingTasks) {
+ this.reportingTasks = reportingTasks;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
index fcd3ea3..69ce8d9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
@@ -18,6 +18,7 @@ package org.apache.nifi.documentation.mock;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
/**
* A Mock ControllerServiceInitializationContext so that ControllerServices can
@@ -37,4 +38,9 @@ public class MockControllerServiceInitializationContext implements ControllerSer
return new MockControllerServiceLookup();
}
+ @Override
+ public ComponentLog getLogger() {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java
index f11bc68..5c60881 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceLookup.java
@@ -52,4 +52,14 @@ public class MockControllerServiceLookup implements ControllerServiceLookup {
return Collections.emptySet();
}
+ @Override
+ public boolean isControllerServiceEnabling(String serviceIdentifier) {
+ return false;
+ }
+
+ @Override
+ public String getControllerServiceName(String serviceIdentifier) {
+ return serviceIdentifier;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
index 910ce5a..dc6e236 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
@@ -19,6 +19,7 @@ package org.apache.nifi.documentation.mock;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -26,8 +27,6 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
* A Mock ReportingInitializationContext that can be used to initialize a
* ReportingTask for the purposes of documentation generation.
*
- * @author Alligator
- *
*/
public class MockReportingInitializationContext implements ReportingInitializationContext {
@@ -60,4 +59,9 @@ public class MockReportingInitializationContext implements ReportingInitializati
public SchedulingStrategy getSchedulingStrategy() {
return SchedulingStrategy.TIMER_DRIVEN;
}
+
+ @Override
+ public ComponentLog getLogger() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
index f9ee703..70dcc81 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/pom.xml
@@ -34,10 +34,6 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
- <artifactId>nifi-web-optimistic-locking</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
<artifactId>nifi-administration</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
index 012e7c7..c8c7206 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/cluster/context/ClusterContextThreadLocal.java
@@ -32,12 +32,7 @@ public class ClusterContextThreadLocal {
}
public static ClusterContext getContext() {
- ClusterContext ctx = contextHolder.get();
- if(ctx == null) {
- ctx = createEmptyContext();
- contextHolder.set(ctx);
- }
- return ctx;
+ return contextHolder.get();
}
public static void setContext(final ClusterContext context) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
deleted file mode 100644
index 90b8a37..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-web/src/main/java/org/apache/nifi/web/ClusterAwareOptimisticLockingManager.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.web;
-
-import org.apache.nifi.cluster.context.ClusterContext;
-import org.apache.nifi.cluster.context.ClusterContextThreadLocal;
-
-/**
- * An optimistic locking manager that provides for optimistic locking in a clustered
- * environment.
- *
- * @author unattributed
- */
-public class ClusterAwareOptimisticLockingManager implements OptimisticLockingManager {
-
- private final OptimisticLockingManager optimisticLockingManager;
-
- public ClusterAwareOptimisticLockingManager(final OptimisticLockingManager optimisticLockingManager) {
- this.optimisticLockingManager = optimisticLockingManager;
- }
-
- @Override
- public Revision checkRevision(Revision revision) throws InvalidRevisionException {
- final Revision currentRevision = getRevision();
- if(currentRevision.equals(revision) == false) {
- throw new InvalidRevisionException(String.format("Given revision %s does not match current revision %s.", revision, currentRevision));
- } else {
- return revision.increment(revision.getClientId());
- }
- }
-
- @Override
- public boolean isCurrent(Revision revision) {
- return getRevision().equals(revision);
- }
-
- @Override
- public Revision getRevision() {
- final ClusterContext ctx = ClusterContextThreadLocal.getContext();
- if(ctx == null || ctx.getRevision() == null) {
- return optimisticLockingManager.getRevision();
- } else {
- return ctx.getRevision();
- }
- }
-
- @Override
- public void setRevision(final Revision revision) {
- final ClusterContext ctx = ClusterContextThreadLocal.getContext();
- if(ctx != null) {
- ctx.setRevision(revision);
- }
- optimisticLockingManager.setRevision(revision);
- }
-
- @Override
- public Revision incrementRevision() {
- final Revision currentRevision = getRevision();
- final Revision incRevision = currentRevision.increment();
- setRevision(incRevision);
- return incRevision;
- }
-
- @Override
- public Revision incrementRevision(final String clientId) {
- final Revision currentRevision = getRevision();
- final Revision incRevision = currentRevision.increment(clientId);
- setRevision(incRevision);
- return incRevision;
- }
-
- @Override
- public String getLastModifier() {
- return optimisticLockingManager.getLastModifier();
- }
-
- @Override
- public void setLastModifier(final String lastModifier) {
- optimisticLockingManager.setLastModifier(lastModifier);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
index 7b6a418..bdff00f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/pom.xml
@@ -47,6 +47,10 @@
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-web-optimistic-locking</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-core</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
index eedb88f..c17b429 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/ClusterDataFlow.java
@@ -27,14 +27,25 @@ import org.apache.nifi.cluster.protocol.StandardDataFlow;
public class ClusterDataFlow {
private final StandardDataFlow dataFlow;
-
private final NodeIdentifier primaryNodeId;
+ private final byte[] controllerServices;
+ private final byte[] reportingTasks;
- public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId) {
+ public ClusterDataFlow(final StandardDataFlow dataFlow, final NodeIdentifier primaryNodeId, final byte[] controllerServices, final byte[] reportingTasks) {
this.dataFlow = dataFlow;
this.primaryNodeId = primaryNodeId;
+ this.controllerServices = controllerServices;
+ this.reportingTasks = reportingTasks;
}
+ public byte[] getControllerServices() {
+ return controllerServices;
+ }
+
+ public byte[] getReportingTasks() {
+ return reportingTasks;
+ }
+
public NodeIdentifier getPrimaryNodeId() {
return primaryNodeId;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
index 339d904..082d65e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/DataFlowManagementService.java
@@ -17,6 +17,7 @@
package org.apache.nifi.cluster.flow;
import java.util.Set;
+
import org.apache.nifi.cluster.protocol.NodeIdentifier;
/**
@@ -67,6 +68,22 @@ public interface DataFlowManagementService {
void updatePrimaryNode(NodeIdentifier nodeId) throws DaoException;
/**
+ * Updates the dataflow with the given serialized form of the Controller Services that are to exist on the NCM.
+ *
+ * @param serializedControllerServices
+ * @throws DaoException
+ */
+ void updateControllerServices(byte[] serializedControllerServices) throws DaoException;
+
+ /**
+ * Updates the dataflow with the given serialized form of Reporting Tasks that are to exist on the NCM.
+ *
+ * @param serviceNodes
+ * @throws DaoException
+ */
+ void updateReportingTasks(byte[] serializedReportingTasks) throws DaoException;
+
+ /**
* Sets the state of the flow.
*
* @param flowState the state
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
index 72b594a..dd9d2a3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -111,6 +111,8 @@ public class DataFlowDaoImpl implements DataFlowDao {
public static final String FLOW_XML_FILENAME = "flow.xml";
public static final String TEMPLATES_FILENAME = "templates.xml";
public static final String SNIPPETS_FILENAME = "snippets.xml";
+ public static final String CONTROLLER_SERVICES_FILENAME = "controller-services.xml";
+ public static final String REPORTING_TASKS_FILENAME = "reporting-tasks.xml";
public static final String CLUSTER_INFO_FILENAME = "cluster-info.xml";
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowDaoImpl.class));
@@ -408,13 +410,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
final File stateFile = new File(dir, FLOW_PACKAGE);
stateFile.createNewFile();
- final byte[] flowBytes = getEmptyFlowBytes();
- final byte[] templateBytes = new byte[0];
- final byte[] snippetBytes = new byte[0];
- final DataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
-
- final ClusterMetadata clusterMetadata = new ClusterMetadata();
- writeDataFlow(stateFile, dataFlow, clusterMetadata);
+ writeDataFlow(stateFile, new ClusterDataFlow(null, null, new byte[0], new byte[0]), new ClusterMetadata());
return stateFile;
}
@@ -479,7 +475,9 @@ public class DataFlowDaoImpl implements DataFlowDao {
byte[] templateBytes = new byte[0];
byte[] snippetBytes = new byte[0];
byte[] clusterInfoBytes = new byte[0];
-
+ byte[] controllerServiceBytes = new byte[0];
+ byte[] reportingTaskBytes = new byte[0];
+
try (final InputStream inStream = new FileInputStream(file);
final TarArchiveInputStream tarIn = new TarArchiveInputStream(new BufferedInputStream(inStream))) {
TarArchiveEntry tarEntry;
@@ -501,6 +499,14 @@ public class DataFlowDaoImpl implements DataFlowDao {
clusterInfoBytes = new byte[(int) tarEntry.getSize()];
StreamUtils.fillBuffer(tarIn, clusterInfoBytes, true);
break;
+ case CONTROLLER_SERVICES_FILENAME:
+ controllerServiceBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, controllerServiceBytes, true);
+ break;
+ case REPORTING_TASKS_FILENAME:
+ reportingTaskBytes = new byte[(int) tarEntry.getSize()];
+ StreamUtils.fillBuffer(tarIn, reportingTaskBytes, true);
+ break;
default:
throw new DaoException("Found Unexpected file in dataflow configuration: " + tarEntry.getName());
}
@@ -518,7 +524,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
final StandardDataFlow dataFlow = new StandardDataFlow(flowBytes, templateBytes, snippetBytes);
dataFlow.setAutoStartProcessors(autoStart);
- return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId());
+ return new ClusterDataFlow(dataFlow, (clusterMetadata == null) ? null : clusterMetadata.getPrimaryNodeId(), controllerServiceBytes, reportingTaskBytes);
}
private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow) throws IOException, JAXBException {
@@ -536,7 +542,7 @@ public class DataFlowDaoImpl implements DataFlowDao {
clusterMetadata.setPrimaryNodeId(clusterDataFlow.getPrimaryNodeId());
// write to disk
- writeDataFlow(file, dataFlow, clusterMetadata);
+ writeDataFlow(file, clusterDataFlow, clusterMetadata);
}
private void writeTarEntry(final TarArchiveOutputStream tarOut, final String filename, final byte[] bytes) throws IOException {
@@ -547,14 +553,23 @@ public class DataFlowDaoImpl implements DataFlowDao {
tarOut.closeArchiveEntry();
}
- private void writeDataFlow(final File file, final DataFlow dataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
+ private void writeDataFlow(final File file, final ClusterDataFlow clusterDataFlow, final ClusterMetadata clusterMetadata) throws IOException, JAXBException {
try (final OutputStream fos = new FileOutputStream(file);
final TarArchiveOutputStream tarOut = new TarArchiveOutputStream(new BufferedOutputStream(fos))) {
- writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
- writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
- writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
+ final DataFlow dataFlow = clusterDataFlow.getDataFlow();
+ if ( dataFlow == null ) {
+ writeTarEntry(tarOut, FLOW_XML_FILENAME, getEmptyFlowBytes());
+ writeTarEntry(tarOut, TEMPLATES_FILENAME, new byte[0]);
+ writeTarEntry(tarOut, SNIPPETS_FILENAME, new byte[0]);
+ } else {
+ writeTarEntry(tarOut, FLOW_XML_FILENAME, dataFlow.getFlow());
+ writeTarEntry(tarOut, TEMPLATES_FILENAME, dataFlow.getTemplates());
+ writeTarEntry(tarOut, SNIPPETS_FILENAME, dataFlow.getSnippets());
+ }
+ writeTarEntry(tarOut, CONTROLLER_SERVICES_FILENAME, clusterDataFlow.getControllerServices());
+ writeTarEntry(tarOut, REPORTING_TASKS_FILENAME, clusterDataFlow.getReportingTasks());
final ByteArrayOutputStream baos = new ByteArrayOutputStream(256);
writeClusterMetadata(clusterMetadata, baos);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
index e135af3..1bb8ca3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.java
@@ -41,7 +41,6 @@ import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.util.FormatUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,17 +153,74 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
final StandardDataFlow dataFlow;
+ final byte[] controllerServiceBytes;
+ final byte[] reportingTaskBytes;
if (existingClusterDataFlow == null) {
dataFlow = null;
+ controllerServiceBytes = new byte[0];
+ reportingTaskBytes = new byte[0];
} else {
dataFlow = existingClusterDataFlow.getDataFlow();
+ controllerServiceBytes = existingClusterDataFlow.getControllerServices();
+ reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
}
- flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId));
+ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
} finally {
resourceLock.unlock("updatePrimaryNode");
}
}
+
+
+ @Override
+ public void updateControllerServices(final byte[] controllerServiceBytes) throws DaoException {
+ resourceLock.lock();
+ try {
+ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+ final StandardDataFlow dataFlow;
+ final byte[] reportingTaskBytes;
+ final NodeIdentifier nodeId;
+ if (existingClusterDataFlow == null) {
+ dataFlow = null;
+ nodeId = null;
+ reportingTaskBytes = new byte[0];
+ } else {
+ dataFlow = existingClusterDataFlow.getDataFlow();
+ nodeId = existingClusterDataFlow.getPrimaryNodeId();
+ reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
+ }
+
+ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
+ } finally {
+ resourceLock.unlock("updateControllerServices");
+ }
+ }
+
+ @Override
+ public void updateReportingTasks(final byte[] reportingTaskBytes) throws DaoException {
+ resourceLock.lock();
+ try {
+ final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
+
+ final StandardDataFlow dataFlow;
+ final byte[] controllerServiceBytes;
+ final NodeIdentifier nodeId;
+ if (existingClusterDataFlow == null) {
+ dataFlow = null;
+ nodeId = null;
+ controllerServiceBytes = null;
+ } else {
+ dataFlow = existingClusterDataFlow.getDataFlow();
+ nodeId = existingClusterDataFlow.getPrimaryNodeId();
+ controllerServiceBytes = existingClusterDataFlow.getControllerServices();
+ }
+
+ flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
+ } finally {
+ resourceLock.unlock("updateControllerServices");
+ }
+ }
@Override
public PersistedFlowState getPersistedFlowState() {
@@ -303,9 +359,10 @@ public class DataFlowManagementServiceImpl implements DataFlowManagementService
final ClusterDataFlow existingClusterDataFlow = flowDao.loadDataFlow();
final ClusterDataFlow currentClusterDataFlow;
if (existingClusterDataFlow == null) {
- currentClusterDataFlow = new ClusterDataFlow(dataFlow, null);
+ currentClusterDataFlow = new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]);
} else {
- currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId());
+ currentClusterDataFlow = new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(),
+ existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
}
flowDao.saveDataFlow(currentClusterDataFlow);
flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
index 3f966e5..8bc73ab 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -192,6 +192,20 @@ public class NodeResponse {
}
/**
+ * If this node response has been merged returns the updated entity,
+ * otherwise null. Also returns null if hasThrowable() is true. The
+ * intent of this method is to support getting the response entity
+ * when it was already consumed during the merge operation. In this
+ * case the client response rom getClientResponse() will not support
+ * a getEntity(...) or getEntityInputStream() call.
+ *
+ * @return
+ */
+ public Entity getUpdatedEntity() {
+ return updatedEntity;
+ }
+
+ /**
* Creates a Response by mapping the ClientResponse values to it. Since the
* ClientResponse's input stream can only be read once, this method should
* only be called once. Furthermore, the caller should not have already read