You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2018/04/02 18:42:11 UTC
[2/3] nifi-minifi git commit: MINIFI-448 Adds C2 Server service layer
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileHeartbeatPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileHeartbeatPersistenceProvider.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileHeartbeatPersistenceProvider.java
new file mode 100644
index 0000000..16a2d1e
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileHeartbeatPersistenceProvider.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.provider.persistence;
+
+import org.apache.nifi.minifi.c2.api.provider.heartbeat.HeartbeatPersistenceProvider;
+import org.apache.nifi.minifi.c2.model.C2Heartbeat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * A simple, in-memory "persistence" provider in order to test the service layer.
+ *
+ * This is not designed for real use outside of development. For example:
+ * - it only keeps an in-memory record of saved entities, there is no real persistence
+ * - it does not support transactions
+ * - it does not clone objects on save/retrieval, so any modifications made after interacting with this service
+ * also modify the "persisted" copies.
+ *
+ * TODO, deep copy objects on save/get so that they cannot be modified outside this class without modifying the persisted copy.
+ */
+class VolatileHeartbeatPersistenceProvider implements HeartbeatPersistenceProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(VolatileHeartbeatPersistenceProvider.class);
+
+ private Map<String, C2Heartbeat> heartbeats = new ConcurrentHashMap<>();
+
+ @Override
+ public long getCount() {
+ return heartbeats.size();
+ }
+
+ @Override
+ public C2Heartbeat save(C2Heartbeat heartbeat) {
+
+ if (heartbeat == null || heartbeat.getIdentifier() == null) {
+ throw new IllegalArgumentException("Heartbeat must be not null and must have an id in order to be saved.");
+ }
+
+ // TODO, atomic transaction
+ heartbeats.put(heartbeat.getIdentifier(), heartbeat);
+
+ logger.debug("Saved heartbeat with id={}", heartbeat.getIdentifier());
+ return heartbeat;
+
+ }
+
+ @Override
+ public Iterable<C2Heartbeat> getAll() {
+ return new ArrayList<>(heartbeats.values());
+ }
+
+ @Override
+ public Iterable<C2Heartbeat> getByAgent(String agentId) {
+ if (agentId == null) {
+ throw new IllegalArgumentException("Agent id cannot be null");
+ }
+
+ return heartbeats.values().stream()
+ .filter(hb -> hb.getAgentInfo() != null)
+ .filter(hb -> agentId.equals(hb.getAgentInfo().getIdentifier()) )
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean existsById(String heartbeatId) {
+ if (heartbeatId == null) {
+ throw new IllegalArgumentException("Heartbeat id cannot be null");
+ }
+ return heartbeats.containsKey(heartbeatId);
+ }
+
+ @Override
+ public Optional<C2Heartbeat> getById(String heartbeatId) {
+ if (heartbeatId == null) {
+ throw new IllegalArgumentException("Heartbeat id cannot be null");
+ }
+ return Optional.ofNullable(heartbeats.get(heartbeatId));
+ }
+
+ @Override
+ public void deleteById(String heartbeatId) {
+ if (heartbeatId == null) {
+ throw new IllegalArgumentException("Heartbeat id cannot be null");
+ }
+ heartbeats.remove(heartbeatId);
+ }
+
+ @Override
+ public void delete(C2Heartbeat heartbeat) {
+ if (heartbeat == null || heartbeat.getIdentifier() == null) {
+ throw new IllegalArgumentException("heartbeat cannot be null and must have an id");
+ }
+ deleteById(heartbeat.getIdentifier());
+ }
+
+ @Override
+ public void deleteAll() {
+ heartbeats.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileOperationPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileOperationPersistenceProvider.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileOperationPersistenceProvider.java
new file mode 100644
index 0000000..fa74f77
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileOperationPersistenceProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.provider.persistence;
+
+import org.apache.nifi.minifi.c2.api.provider.operations.OperationPersistenceProvider;
+import org.apache.nifi.minifi.c2.model.OperationRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * A simple, in-memory "persistence" provider in order to test the service layer.
+ *
+ * This is not designed for real use outside of development. For example:
+ * - it only keeps an in-memory record of saved entities, there is no real persistence
+ * - it does not support transactions
+ * - it does not clone objects on save/retrieval, so any modifications made after interacting with this service
+ * also modify the "persisted" copies.
+ *
+ * TODO, deep copy objects on save/get so that they cannot be modified outside this class without modifying the persisted copy.
+ */
+public class VolatileOperationPersistenceProvider implements OperationPersistenceProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(VolatileOperationPersistenceProvider.class);
+
+ private Map<String, OperationRequest> operations = new ConcurrentHashMap<>();
+
+ @Override
+ public long getCount() {
+ return operations.size();
+ }
+
+
+ @Override
+ public OperationRequest save(OperationRequest operationRequest) {
+ if (operationRequest == null || operationRequest.getOperation() == null || operationRequest.getOperation().getIdentifier() == null) {
+ throw new IllegalArgumentException("operation must be not null and have id");
+ }
+ operations.put(operationRequest.getOperation().getIdentifier(), operationRequest);
+ return operationRequest;
+ }
+
+ @Override
+ public Iterable<OperationRequest> getAll() {
+ return new ArrayList<>(operations.values());
+ }
+
+ @Override
+ public Iterable<OperationRequest> getByAgent(String agentId) {
+ if (agentId == null) {
+ throw new IllegalArgumentException("Agent id cannot be null");
+ }
+ return operations.values().stream()
+ .filter(operation -> agentId.equals(operation.getTargetAgentIdentifier()))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean existsById(String operationId) {
+ if (operationId == null) {
+ throw new IllegalArgumentException("operationId cannot be null");
+ }
+ return operations.containsKey(operationId);
+ }
+
+ @Override
+ public Optional<OperationRequest> getById(String operationId) {
+ if (operationId == null) {
+ throw new IllegalArgumentException("operationId cannot be null");
+ }
+ return Optional.ofNullable(operations.get(operationId));
+ }
+
+ @Override
+ public void deleteById(String operationId) {
+ if (operationId == null) {
+ throw new IllegalArgumentException("operationId cannot be null");
+ }
+ operations.remove(operationId);
+ }
+
+ @Override
+ public void delete(OperationRequest operationRequest) {
+ if (operationRequest == null || operationRequest.getOperation() == null || operationRequest.getOperation().getIdentifier() == null) {
+ throw new IllegalArgumentException("operation must be not null and have id");
+ }
+ operations.remove(operationRequest.getOperation().getIdentifier());
+ }
+
+ @Override
+ public void deleteAll() {
+ operations.clear();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatilePersistenceProviderFactory.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatilePersistenceProviderFactory.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatilePersistenceProviderFactory.java
new file mode 100644
index 0000000..95d584e
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatilePersistenceProviderFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.provider.persistence;
+
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentClassPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentManifestPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.device.DevicePersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.heartbeat.HeartbeatPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.operations.OperationPersistenceProvider;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * TODO, replace this with a factory that is externally configurable
+ */
+@Configuration
+public class VolatilePersistenceProviderFactory {
+
+ private final VolatileAgentPersistenceProvider agentPersistenceProvider;
+ private final VolatileAgentClassPersistenceProvider agentClassPersistenceProvider;
+ private final VolatileAgentManifestPersistenceProvider agentManifestPersistenceProvider;
+ private final VolatileDevicePersistenceProvider devicePersistenceProvider;
+ private final VolatileHeartbeatPersistenceProvider heartbeatPersistenceProvider;
+ private final VolatileOperationPersistenceProvider operationPersistenceProvider;
+
+ public VolatilePersistenceProviderFactory() {
+ agentPersistenceProvider = new VolatileAgentPersistenceProvider();
+ agentClassPersistenceProvider = new VolatileAgentClassPersistenceProvider();
+ agentManifestPersistenceProvider = new VolatileAgentManifestPersistenceProvider();
+ devicePersistenceProvider = new VolatileDevicePersistenceProvider();
+ heartbeatPersistenceProvider = new VolatileHeartbeatPersistenceProvider();
+ operationPersistenceProvider = new VolatileOperationPersistenceProvider();
+ }
+
+ @Bean
+ public DevicePersistenceProvider getDevicePersistenceProvider() {
+ return devicePersistenceProvider;
+ }
+
+ @Bean
+ public OperationPersistenceProvider getOperationPersistenceProvider() {
+ return operationPersistenceProvider;
+ }
+
+ @Bean
+ public HeartbeatPersistenceProvider getHeartBeatPersistenceProvider() {
+ return heartbeatPersistenceProvider;
+ }
+
+ @Bean
+ public AgentClassPersistenceProvider getAgentClassPersistenceProvider() {
+ return agentClassPersistenceProvider;
+ }
+
+ @Bean
+ public AgentManifestPersistenceProvider getAgentManifestPersistenceProvider() {
+ return agentManifestPersistenceProvider;
+ }
+
+ @Bean
+ public AgentPersistenceProvider getAgentPersistenceProvider() {
+ return agentPersistenceProvider;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2ProtocolService.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2ProtocolService.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2ProtocolService.java
new file mode 100644
index 0000000..54f5a3e
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2ProtocolService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service;
+
+import org.apache.nifi.minifi.c2.model.C2Heartbeat;
+import org.apache.nifi.minifi.c2.model.C2HeartbeatResponse;
+import org.apache.nifi.minifi.c2.model.C2OperationAck;
+
+public interface C2ProtocolService {
+
+ C2HeartbeatResponse processHeartbeat(C2Heartbeat heartbeat);
+
+ void processOperationAck(C2OperationAck operationAck);
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java
index c0acb57..05c3f21 100644
--- a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java
@@ -5,9 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
* http://www.apache.org/licenses/LICENSE-2.0
- *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,44 +14,97 @@
*/
package org.apache.nifi.minifi.c2.core.service;
-import org.apache.nifi.minifi.c2.core.persistence.C2Repository;
-import org.apache.nifi.minifi.c2.model.TestObject;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
+import org.apache.nifi.minifi.c2.model.Agent;
+import org.apache.nifi.minifi.c2.model.AgentClass;
+import org.apache.nifi.minifi.c2.model.AgentManifest;
+import org.apache.nifi.minifi.c2.model.Device;
+import org.apache.nifi.minifi.c2.model.OperationRequest;
+import org.apache.nifi.minifi.c2.model.OperationState;
-import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+
+/**
+ * Standard CRUD method semantics apply to these methods. That is:
+ *
+ * - getWidgets: return List of Widget,
+ * or empty List if no Widgets exist
+ *
+ * - getWidget(String): return Optional Widget with matching id,
+ * or empty Optional if no Widget with matching id exists
+ *
+ * - createWidget(Widget): create Widget and assign it a generated id,
+ * return created widget (including any fields that got generated such as id or creation timestamp),
+ * throw IllegalStateException if Widget with matching id already exists
+ * throw IllegalArgumentException if Widget is not valid (e.g., missing required fields)
+ *
+ * - updateWidget(Widget): update Widget with the id to match the incoming Widget
+ * return updated Widget
+ * throw IllegalArgumentException if Widget is not valid (e.g., missing required fields. Note, id is required when updating existing Widget)
+ * throw ResourceNotFoundException if no Widget with matching id exists
+ *
+ * - deleteWidget(String): delete Widget with id,
+ * return Widget that was deleted,
+ * throw ResourceNotFoundException if no Widget with matching id exists
+ *
+ * Any invalid arguments (eg, null where required) will result in an IllegalArgumentException
+ */
+public interface C2Service {
+
+ //**********************************
+ //*** Agent Class CRUD methods ***
+ //**********************************
+
+ AgentClass createAgentClass(AgentClass agentClass);
+ List<AgentClass> getAgentClasses();
+ Optional<AgentClass> getAgentClass(String name);
+ AgentClass updateAgentClass(AgentClass agentClass);
+ AgentClass deleteAgentClass(String name);
+
+
+ //*************************************
+ //*** Agent Manifest CRUD methods ***
+ //*************************************
+
+ AgentManifest createAgentManifest(AgentManifest agentManifest);
+ List<AgentManifest> getAgentManifests();
+ List<AgentManifest> getAgentManifests(String agentClassName);
+ Optional<AgentManifest> getAgentManifest(String manifestId);
+ AgentManifest deleteAgentManifest(String manifestId);
+
-@Service
-public class C2Service {
+ //****************************
+ //*** Agent CRUD methods ***
+ //****************************
- C2Repository c2Repository;
+ Agent createAgent(Agent agent);
+ List<Agent> getAgents();
+ List<Agent> getAgents(String agentClassName);
+ Optional<Agent> getAgent(String agentId);
+ Agent updateAgent(Agent agent);
+ Agent deleteAgent(String agentId);
- @Autowired
- public C2Service(C2Repository c2Repository) {
- this.c2Repository = c2Repository;
- }
- public TestObject createTestObject(TestObject testObject) {
- return c2Repository.createTestObject(testObject);
- }
+ //*****************************
+ //*** Device CRUD methods ***
+ //*****************************
- public List<TestObject> getTestObjects() {
- List<TestObject> objects = new ArrayList<>();
- c2Repository.getTestObjects().forEachRemaining(objects::add);
- return objects;
- }
+ Device createDevice(Device device);
+ List<Device> getDevices();
+ Optional<Device> getDevice(String deviceId);
+ Device updateDevice(Device device);
+ Device deleteDevice(String deviceId);
- public TestObject getTestObjectById(String identifier) {
- return c2Repository.getTestObjectById(identifier);
- }
- public TestObject updateTestObject(TestObject testObject) {
- return c2Repository.updateTestObject(testObject);
- }
+ //***********************************
+ //*** C2 Operation CRUD methods ***
+ //***********************************
- public TestObject deleteTestObject(String identifier) {
- return c2Repository.deleteTestObject(identifier);
- }
+ OperationRequest createOperation(OperationRequest operationRequest);
+ List<OperationRequest> getOperations();
+ List<OperationRequest> getOperationsByAgent(String agentId);
+ Optional<OperationRequest> getOperation(String operationId);
+ OperationRequest updateOperationState(String operationId, OperationState state);
+ OperationRequest deleteOperation(String operationId);
}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolService.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolService.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolService.java
new file mode 100644
index 0000000..60e56b9
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolService.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service;
+
+import org.apache.nifi.minifi.c2.api.provider.heartbeat.HeartbeatPersistenceProvider;
+import org.apache.nifi.minifi.c2.model.Agent;
+import org.apache.nifi.minifi.c2.model.AgentClass;
+import org.apache.nifi.minifi.c2.model.AgentInfo;
+import org.apache.nifi.minifi.c2.model.C2Heartbeat;
+import org.apache.nifi.minifi.c2.model.C2HeartbeatResponse;
+import org.apache.nifi.minifi.c2.model.C2Operation;
+import org.apache.nifi.minifi.c2.model.C2OperationAck;
+import org.apache.nifi.minifi.c2.model.Device;
+import org.apache.nifi.minifi.c2.model.DeviceInfo;
+import org.apache.nifi.minifi.c2.model.OperationRequest;
+import org.apache.nifi.minifi.c2.model.OperationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+@Service
+public class StandardC2ProtocolService implements C2ProtocolService {
+
+ private static final Logger logger = LoggerFactory.getLogger(StandardC2ProtocolService.class);
+
+ private C2Service c2Service;
+ private HeartbeatPersistenceProvider heartbeatPersistenceProvider;
+
+
+ @Autowired
+ public StandardC2ProtocolService(
+ C2Service c2Service,
+ HeartbeatPersistenceProvider heartbeatPersistenceProvider) {
+ this.c2Service = c2Service;
+ this.heartbeatPersistenceProvider = heartbeatPersistenceProvider;
+ }
+
+ @Override
+ public C2HeartbeatResponse processHeartbeat(C2Heartbeat heartbeat) {
+
+ if (heartbeat == null) {
+ throw new IllegalArgumentException("Heartbeat cannot be null");
+ }
+
+ heartbeat.setTimestamp(System.currentTimeMillis());
+ heartbeat.setIdentifier(UUID.randomUUID().toString());
+
+ logger.info("Processing heartbeat: {}", heartbeat.toString());
+
+ persistHeartbeat(heartbeat);
+ processHeartbeatDeviceInfo(heartbeat);
+ processHeartbeatAgentInfo(heartbeat);
+
+ C2HeartbeatResponse response = new C2HeartbeatResponse();
+ List<C2Operation> requestedOperations = new ArrayList<>(getQueuedC2Operations(heartbeat));
+ // TODO, detect if NiFi Registry integration is configured,
+ // and if so, call Flow Retrieval Service to detect if we need to add a Flow Update operation
+ if (!requestedOperations.isEmpty()) {
+ response.setRequestedOperations(requestedOperations);
+ for (C2Operation op : requestedOperations) {
+ try {
+ c2Service.updateOperationState(op.getIdentifier(), OperationState.DEPLOYED);
+ } catch (Exception e) {
+ logger.warn("Encountered exception while updating operation state", e);
+ }
+ }
+ }
+
+ return response;
+ }
+
+ @Override
+ public void processOperationAck(C2OperationAck operationAck) {
+
+ try {
+ // TODO, add operation status (eg success/failed) to operationAck. For now, assume ack indicates successful execution.
+ c2Service.updateOperationState(operationAck.getOperationId(), OperationState.DONE);
+ } catch (Exception e) {
+ logger.warn("Encountered exception while processing operation ack", e);
+ }
+
+ }
+
+ private void persistHeartbeat(C2Heartbeat heartbeat) {
+ try {
+ heartbeatPersistenceProvider.save(heartbeat);
+ } catch (Exception e) {
+ logger.warn("Encountered exception while trying to record heartbeat", e);
+ }
+ }
+
+ private void processHeartbeatDeviceInfo(C2Heartbeat heartbeat) {
+ try {
+ final String deviceIdentifier;
+ final DeviceInfo deviceInfo = heartbeat.getDeviceInfo();
+ if (deviceInfo != null) {
+ deviceIdentifier = deviceInfo.getIdentifier();
+
+ if (deviceIdentifier == null) {
+ logger.info("Could not register device without identifier: {} ", deviceInfo);
+ return;
+ }
+
+ logger.debug("Creating/updating device info for deviceId={}", deviceIdentifier);
+ Optional<Device> existingDevice = c2Service.getDevice(deviceIdentifier);
+ boolean deviceExists = (existingDevice.isPresent());
+ Device device = existingDevice.orElse(new Device());
+ if (!deviceExists) {
+ device.setIdentifier(deviceIdentifier);
+ device.setFirstSeen(heartbeat.getTimestamp());
+ }
+ device.setLastSeen(heartbeat.getTimestamp());
+ device.setSystemInfo(deviceInfo.getSystemInfo());
+ device.setNetworkInfo(deviceInfo.getNetworkInfo());
+
+ if (!deviceExists) {
+ c2Service.createDevice(device);
+ } else {
+ c2Service.updateDevice(device);
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Encountered exception while trying to update device info", e);
+ }
+ }
+
+ private void processHeartbeatAgentInfo(C2Heartbeat heartbeat) {
+ try {
+ final String agentIdentifier;
+ final AgentInfo agentInfo = heartbeat.getAgentInfo();
+ if (agentInfo != null) {
+ agentIdentifier = agentInfo.getIdentifier();
+
+ if (agentIdentifier == null) {
+ logger.info("Could not register agent without identifier: {} ", agentInfo);
+ return;
+ }
+
+ logger.debug("Creating/updating agent info for agentId={}", agentIdentifier);
+ Optional<Agent> existingAgent = c2Service.getAgent(agentIdentifier);
+ boolean agentExists = existingAgent.isPresent();
+ final Agent agent = existingAgent.orElse(new Agent());
+ if (!agentExists) {
+ agent.setIdentifier(agentIdentifier);
+ agent.setFirstSeen(heartbeat.getTimestamp());
+ }
+ agent.setLastSeen(heartbeat.getTimestamp());
+ if (agentInfo.getAgentClass() != null) {
+ agent.setAgentClass(agentInfo.getAgentClass());
+ }
+ if (agentInfo.getAgentManifest() != null) {
+ agent.setAgentManifest(agentInfo.getAgentManifest());
+ }
+ if (agentInfo.getStatus() != null) {
+ agent.setStatus(agentInfo.getStatus());
+ }
+
+ // Create agent manifest if this is the first time we've seen it
+ String agentManifestIdentifier = null;
+ if (agentInfo.getAgentManifest() != null) {
+ agent.setAgentManifest(agentInfo.getAgentManifest());
+ agentManifestIdentifier = agent.getAgentManifest().getIdentifier();
+
+ // Note, a client-set agent manifest identifier is required so that we don't create infinite manifests
+ // Alternatively, we need some way of deterministically generating a manifest id from the manifest contents,
+ // So that two heartbeats with a matching agent manifest do not register separate manifests
+ if (!c2Service.getAgentManifest(agentManifestIdentifier).isPresent()) {
+ c2Service.createAgentManifest(agent.getAgentManifest());
+ }
+ }
+
+ // Create agent class if this is the first time we've seen it
+ if (agentInfo.getAgentClass() != null) {
+ agent.setAgentClass(agentInfo.getAgentClass());
+ Optional<AgentClass> existingAgentClass = c2Service.getAgentClass(agent.getAgentClass());
+ if (existingAgentClass.isPresent()) {
+ if (agentManifestIdentifier != null) {
+ AgentClass updatedAgentClass = existingAgentClass.get();
+ Set<String> agentManifests = updatedAgentClass.getAgentManifests();
+ if (agentManifests == null) {
+ agentManifests = new HashSet<>();
+ }
+ agentManifests.add(agentManifestIdentifier);
+ c2Service.updateAgentClass(updatedAgentClass);
+ }
+ } else {
+ AgentClass newAgentClass = new AgentClass();
+ newAgentClass.setName(agent.getAgentClass());
+ newAgentClass.setAgentManifests(agentManifestIdentifier != null ? new HashSet<>(Collections.singleton(agentManifestIdentifier)) : null);
+ c2Service.createAgentClass(newAgentClass);
+ }
+ }
+
+ if (!agentExists) {
+ c2Service.createAgent(agent);
+ } else {
+ c2Service.updateAgent(agent);
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("Encountered exception while trying to update agent info", e);
+ }
+ }
+
+ private List<C2Operation> getQueuedC2Operations(C2Heartbeat heartbeat) {
+
+ final List<C2Operation> queuedC2Operations = new ArrayList<>();
+ if (heartbeat.getAgentInfo() != null) {
+ final String agentIdentifier = heartbeat.getAgentInfo().getIdentifier();
+ if (agentIdentifier != null) {
+ for (OperationRequest op : c2Service.getOperationsByAgent(agentIdentifier)) {
+ if (op.getState() == OperationState.QUEUED) {
+ queuedC2Operations.add(op.getOperation());
+ }
+ }
+ }
+ }
+ return queuedC2Operations;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2Service.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2Service.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2Service.java
new file mode 100644
index 0000000..31d070d
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2Service.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service;
+
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentClassPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentManifestPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.device.DevicePersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.operations.OperationPersistenceProvider;
+import org.apache.nifi.minifi.c2.core.exception.ResourceNotFoundException;
+import org.apache.nifi.minifi.c2.model.Agent;
+import org.apache.nifi.minifi.c2.model.AgentClass;
+import org.apache.nifi.minifi.c2.model.AgentManifest;
+import org.apache.nifi.minifi.c2.model.Device;
+import org.apache.nifi.minifi.c2.model.OperationRequest;
+import org.apache.nifi.minifi.c2.model.OperationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.validation.ConstraintViolation;
+import javax.validation.ConstraintViolationException;
+import javax.validation.Validator;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Service
+public class StandardC2Service implements C2Service {
+
+ private static final Logger logger = LoggerFactory.getLogger(StandardC2Service.class);
+
+ private final AgentPersistenceProvider agentPersistenceProvider;
+ private final AgentClassPersistenceProvider agentClassPersistenceProvider;
+ private final AgentManifestPersistenceProvider agentManifestPersistenceProvider;
+ private final DevicePersistenceProvider devicePersistenceProvider;
+ private final OperationPersistenceProvider operationPersistenceProvider;
+ private final Validator validator;
+
+ private final Lock agentLock = new ReentrantLock();
+ private final Lock agentClassLock = new ReentrantLock();
+ private final Lock agentManifestLock = new ReentrantLock();
+ private final Lock deviceLock = new ReentrantLock();
+ private final Lock operationLock = new ReentrantLock();
+
+ @Autowired
+ public StandardC2Service(
+ final AgentPersistenceProvider agentPersistenceProviderFactory,
+ final AgentClassPersistenceProvider agentClassPersistenceProvider,
+ final AgentManifestPersistenceProvider agentManifestPersistenceProvider,
+ final DevicePersistenceProvider devicePersistenceProvider,
+ final OperationPersistenceProvider operationPersistenceProvider,
+ final Validator validator) {
+ this.agentPersistenceProvider = agentPersistenceProviderFactory;
+ this.agentClassPersistenceProvider = agentClassPersistenceProvider;
+ this.agentManifestPersistenceProvider = agentManifestPersistenceProvider;
+ this.devicePersistenceProvider = devicePersistenceProvider;
+ this.operationPersistenceProvider = operationPersistenceProvider;
+ this.validator = validator;
+ }
+
+ private <T> void validate(T t, String invalidMessage) {
+ if (t == null) {
+ throw new IllegalArgumentException(invalidMessage + ". Object cannot be null");
+ }
+
+ final Set<ConstraintViolation<T>> violations = validator.validate(t);
+ if (violations.size() > 0) {
+ throw new ConstraintViolationException(invalidMessage, violations);
+ }
+ }
+
+ //**********************************
+ //*** Agent Class CRUD methods ***
+ //**********************************
+
+ @Override
+ public List<AgentClass> getAgentClasses() {
+ return iterableToList(agentClassPersistenceProvider.getAll());
+ }
+
+ @Override
+ public AgentClass createAgentClass(AgentClass agentClass) {
+ validate(agentClass, "Cannot create agent class");
+
+ agentClassLock.lock();
+ try {
+ if (agentClassPersistenceProvider.existsById(agentClass.getName())) {
+ throw new IllegalStateException(
+ String.format("Agent class not found with name='%s'", agentClass.getName()));
+ }
+ return agentClassPersistenceProvider.save(agentClass);
+ } finally {
+ agentClassLock.unlock();
+ }
+ }
+
+ @Override
+ public Optional<AgentClass> getAgentClass(String name) {
+ if (name == null) {
+ throw new IllegalArgumentException("Name cannot be null");
+ }
+
+ return agentClassPersistenceProvider.getById(name);
+ }
+
+ @Override
+ public AgentClass updateAgentClass(AgentClass agentClass) {
+ validate(agentClass, "Cannot update agent class");
+ agentClassLock.lock();
+ try {
+ if (!agentClassPersistenceProvider.existsById(agentClass.getName())) {
+ throw new ResourceNotFoundException(
+ String.format("Agent class not found with name='%s'", agentClass.getName()));
+ }
+ return agentClassPersistenceProvider.save(agentClass);
+ } finally {
+ agentClassLock.unlock();
+ }
+ }
+
+ @Override
+ public AgentClass deleteAgentClass(String name) {
+ if (name == null) {
+ throw new IllegalArgumentException("Name cannot be null");
+ }
+ agentClassLock.lock();
+ try {
+ Optional<AgentClass> deletedAgentClass = agentClassPersistenceProvider.getById(name);
+ if (!deletedAgentClass.isPresent()) {
+ throw new ResourceNotFoundException(
+ String.format("Agent class not found with name='%s'", name));
+ }
+ agentClassPersistenceProvider.deleteById(name);
+ return deletedAgentClass.get();
+ } finally {
+ agentClassLock.unlock();
+ }
+ }
+
+
+ //*************************************
+ //*** Agent Manifest CRUD methods ***
+ //*************************************
+
+ @Override
+ public AgentManifest createAgentManifest(AgentManifest agentManifest) {
+ validate(agentManifest, "Could not create agent manifest");
+ if (agentManifest.getIdentifier() == null) {
+ agentManifest.setIdentifier(UUID.randomUUID().toString());
+ }
+
+ agentManifestLock.lock();
+ try {
+ if (agentManifestPersistenceProvider.existsById(agentManifest.getIdentifier())) {
+ throw new IllegalStateException(
+ String.format("Agent manifest already exists with identifier='%s", agentManifest.getIdentifier()));
+ }
+ return agentManifestPersistenceProvider.save(agentManifest);
+ } finally {
+ agentManifestLock.unlock();
+ }
+ }
+
+ @Override
+ public List<AgentManifest> getAgentManifests() {
+ return iterableToList(agentManifestPersistenceProvider.getAll());
+ }
+
+ @Override
+ public List<AgentManifest> getAgentManifests(String agentClassName) {
+ if (agentClassName == null) {
+ throw new IllegalArgumentException("Agent class name cannot be null");
+ }
+
+ Optional<AgentClass> agentClass = agentClassPersistenceProvider.getById(agentClassName);
+ if (agentClass.isPresent()) {
+ Iterable<String> manifestIds = agentClass.get().getAgentManifests();
+ if (manifestIds != null) {
+ return iterableToList(agentManifestPersistenceProvider.getAllById(manifestIds));
+ }
+ }
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Optional<AgentManifest> getAgentManifest(String manifestId) {
+ if (manifestId == null) {
+ throw new IllegalArgumentException("Agent manifest id must not be null");
+ }
+
+ return agentManifestPersistenceProvider.getById(manifestId);
+ }
+
+ @Override
+ public AgentManifest deleteAgentManifest(String manifestId) {
+ if (manifestId == null) {
+ throw new IllegalArgumentException("Agent manifest id must not be null");
+ }
+
+ agentManifestLock.lock();
+ try {
+ final Optional<AgentManifest> deletedAgentManifest = agentManifestPersistenceProvider.getById(manifestId);
+ if (!deletedAgentManifest.isPresent()) {
+ throw new ResourceNotFoundException(
+ String.format("Agent manifest with id '%s' not found.", manifestId));
+ }
+ agentManifestPersistenceProvider.deleteById(manifestId);
+ return deletedAgentManifest.get();
+ } finally {
+ agentManifestLock.unlock();
+ }
+ }
+
+
+ //****************************
+ //*** Agent CRUD methods ***
+ //****************************
+
+ @Override
+ public Agent createAgent(Agent agent) {
+ validate(agent, "Cannot create agent");
+ agentLock.lock();
+ try {
+ if (agentPersistenceProvider.existsById(agent.getIdentifier())) {
+ throw new IllegalStateException(
+ String.format("Agent not found with identifier='%s'", agent.getIdentifier()));
+ }
+ return agentPersistenceProvider.save(agent);
+ } finally {
+ agentLock.unlock();
+ }
+ }
+
+ @Override
+ public List<Agent> getAgents() {
+ return iterableToList(agentPersistenceProvider.getAll());
+ }
+
+ @Override
+ public List<Agent> getAgents(String agentClassName) {
+ if (agentClassName == null) {
+ throw new IllegalArgumentException("agentClassName cannot be null");
+ }
+
+ return iterableToList(agentPersistenceProvider.getByClassName(agentClassName));
+ }
+
+ @Override
+ public Optional<Agent> getAgent(String agentId) {
+ if (agentId == null) {
+ throw new IllegalArgumentException("agentId cannot be null");
+ }
+
+ return agentPersistenceProvider.getById(agentId);
+ }
+
+ @Override
+ public Agent updateAgent(Agent agent) {
+ validate(agent, "Cannot update agent");
+
+ agentLock.lock();
+ try {
+ final Optional<Agent> oldAgent = agentPersistenceProvider.getById(agent.getIdentifier());
+ if (!oldAgent.isPresent()) {
+ throw new ResourceNotFoundException("Agent not found with id " + agent.getIdentifier());
+ }
+ agent.setFirstSeen(oldAgent.get().getFirstSeen()); // firstSeen timestamp is immutable
+ return agentPersistenceProvider.save(agent);
+ } finally {
+ agentLock.unlock();
+ }
+ }
+
+ @Override
+ public Agent deleteAgent(String agentId) {
+ if (agentId == null) {
+ throw new IllegalArgumentException("agentId cannot be null");
+ }
+
+ agentLock.lock();
+ try {
+ final Optional<Agent> deletedAgent = agentPersistenceProvider.getById(agentId);
+ if (!deletedAgent.isPresent()) {
+ throw new ResourceNotFoundException("Agent not found with id " + agentId);
+ }
+ agentPersistenceProvider.deleteById(agentId);
+ return deletedAgent.get();
+ } finally {
+ agentLock.unlock();
+ }
+ }
+
+
+ //*****************************
+ //*** Device CRUD methods ***
+ //*****************************
+
+ @Override
+ public Device createDevice(Device device) {
+ validate(device, "Cannot create device");
+ deviceLock.lock();
+ try {
+ if (devicePersistenceProvider.existsById(device.getIdentifier())) {
+ throw new IllegalStateException(
+ String.format("Device already exists with id='%s", device.getIdentifier()));
+ }
+ return devicePersistenceProvider.save(device);
+ } finally {
+ deviceLock.unlock();
+ }
+ }
+
+ @Override
+ public List<Device> getDevices() {
+ return iterableToList(devicePersistenceProvider.getAll());
+ }
+
+ @Override
+ public Optional<Device> getDevice(String deviceId) {
+ if (deviceId == null) {
+ throw new IllegalArgumentException("devicId cannot be null");
+ }
+
+ return devicePersistenceProvider.getById(deviceId);
+ }
+
+ @Override
+ public Device updateDevice(Device device) {
+ validate(device, "Cannot update device");
+
+ deviceLock.lock();
+ try {
+ final Optional<Device> oldDevice = devicePersistenceProvider.getById(device.getIdentifier());
+ if (!oldDevice.isPresent()) {
+ throw new ResourceNotFoundException("Device not found with id " + device.getIdentifier());
+ }
+ device.setFirstSeen(oldDevice.get().getFirstSeen()); // firstSeen timestamp is immutable
+ return devicePersistenceProvider.save(device);
+ } finally {
+ deviceLock.unlock();
+ }
+ }
+
+ @Override
+ public Device deleteDevice(String deviceId) {
+ if (deviceId == null) {
+ throw new IllegalArgumentException("devicId cannot be null");
+ }
+
+ deviceLock.lock();
+ try {
+ final Optional<Device> deletedDevice = devicePersistenceProvider.getById(deviceId);
+ if (!deletedDevice.isPresent()) {
+ throw new ResourceNotFoundException("Device not found with id " + deviceId);
+ }
+ devicePersistenceProvider.deleteById(deviceId);
+ return deletedDevice.get();
+ } finally {
+ deviceLock.unlock();
+ }
+ }
+
+
+ //***********************************
+ //*** C2 Operation CRUD methods ***
+ //***********************************
+
+ @Override
+ public OperationRequest createOperation(OperationRequest operationRequest) {
+ validate(operationRequest, "Cannot create operation");
+ operationRequest.getOperation().setIdentifier(UUID.randomUUID().toString());
+
+ return operationPersistenceProvider.save(operationRequest);
+ }
+
+ @Override
+ public List<OperationRequest> getOperations() {
+ return iterableToList(operationPersistenceProvider.getAll());
+ }
+
+ @Override
+ public List<OperationRequest> getOperationsByAgent(String agentId) {
+ if (agentId == null) {
+ throw new IllegalArgumentException("agentId cannot be null");
+ }
+
+ return iterableToList(operationPersistenceProvider.getByAgent(agentId));
+ }
+
+ @Override
+ public Optional<OperationRequest> getOperation(String operationId) {
+ if (operationId == null) {
+ throw new IllegalArgumentException("operationId cannot be null");
+ }
+
+ return operationPersistenceProvider.getById(operationId);
+ }
+
+ @Override
+ public OperationRequest updateOperationState(String operationId, OperationState state) {
+ if (operationId == null) {
+ throw new IllegalArgumentException("operationId cannot be null");
+ }
+ if (state == null) {
+ throw new IllegalArgumentException("state cannot be null");
+ }
+
+ operationLock.lock();
+ try {
+ final Optional<OperationRequest> existingOperation = operationPersistenceProvider.getById(operationId);
+ if (!existingOperation.isPresent()) {
+ throw new ResourceNotFoundException("Operation not found with id " + operationId);
+ }
+ final OperationRequest updatedOperation = existingOperation.get();
+ logger.debug("C2 operation state transition for operationId={}, fromState={}, toState={}", operationId, updatedOperation.getState(), state);
+ updatedOperation.setState(state);
+ return operationPersistenceProvider.save(updatedOperation);
+ } finally {
+ operationLock.unlock();
+ }
+ }
+
+ @Override
+ public OperationRequest deleteOperation(String operationId) {
+ if (operationId == null) {
+ throw new IllegalArgumentException("operationId cannot be null");
+ }
+
+ operationLock.lock();
+ try {
+ final Optional<OperationRequest> deletedOperation = operationPersistenceProvider.getById(operationId);
+ if (!deletedOperation.isPresent()) {
+ throw new ResourceNotFoundException("Operation not found with id " + operationId);
+ }
+ operationPersistenceProvider.deleteById(operationId);
+ return deletedOperation.get();
+ } finally {
+ operationLock.unlock();
+ }
+ }
+
+ private static <T> List<T> iterableToList(Iterable<T> iterable) {
+ final List<T> retList;
+ if (iterable instanceof List) {
+ retList = (List<T>)iterable;
+ } else if (iterable instanceof Collection) {
+ retList = new ArrayList<>((Collection<T>)iterable);
+ } else {
+ retList = new ArrayList<>();
+ iterable.forEach(retList::add);
+ }
+ return retList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolServiceSpec.groovy
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolServiceSpec.groovy b/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolServiceSpec.groovy
new file mode 100644
index 0000000..9903068
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolServiceSpec.groovy
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service
+
+import org.apache.nifi.minifi.c2.api.provider.heartbeat.HeartbeatPersistenceProvider
+import org.apache.nifi.minifi.c2.model.*
+import spock.lang.Specification
+
+class StandardC2ProtocolServiceSpec extends Specification {
+
+ def c2Service = Mock(C2Service)
+ def heartbeatPersistenceProvider = Mock(HeartbeatPersistenceProvider)
+ C2ProtocolService c2ProtocolService
+
+ def setup() {
+ c2ProtocolService = new StandardC2ProtocolService(c2Service, heartbeatPersistenceProvider)
+ }
+
+ def "process heartbeat"() {
+
+ setup:
+ C2Heartbeat heartbeat1 = createTestHeartbeat("agent1", "agentClass1")
+ c2Service.getOperationsByAgent("agent1") >> Collections.emptyList()
+
+ C2Heartbeat heartbeat2 = createTestHeartbeat("agent2", "agentClass2")
+ c2Service.getOperationsByAgent("agent2") >> Collections.singletonList(createOperation("agent2", OperationState.QUEUED))
+
+ c2Service.getAgent(_ as String) >> Optional.empty()
+ c2Service.getAgentManifest(_ as String) >> Optional.empty()
+ c2Service.getAgentClass(_ as String) >> Optional.empty()
+ c2Service.getDevice(_ as String) >> Optional.empty()
+
+
+ when: "heartbeat is processed while no operations are queued"
+ def hbResponse1 = c2ProtocolService.processHeartbeat(heartbeat1)
+
+ then: "empty heartbeat response is generated"
+ with(hbResponse1) {
+ requestedOperations == null
+ }
+
+
+ when: "heartbeat is processed while operations are queued"
+ def hbResponse2 = c2ProtocolService.processHeartbeat(heartbeat2)
+
+ then: "heartbeat response is generated with requested operations"
+ with(hbResponse2) {
+ requestedOperations.size() == 1
+ requestedOperations.get(0).operation == OperationType.DESCRIBE
+ }
+
+
+ when: "heartbeat contains new agent manifest"
+ c2ProtocolService.processHeartbeat(heartbeat1)
+
+ then: "the agent manifest is registered"
+ 1 * c2Service.getAgentManifest(_ as String) >> Optional.empty()
+ 1 * c2Service.createAgentManifest(_ as AgentManifest)
+
+
+ when: "heartbeat contains existing agent manifest"
+ c2ProtocolService.processHeartbeat(heartbeat1)
+
+ then: "the agent manifest is not registered"
+ 1 * c2Service.getAgentManifest(_ as String) >> { id -> Optional.of(new AgentManifest([identifier: id])) }
+ 0 * c2Service.createAgentManifest(*_)
+
+
+ when: "heartbeat contains new agent class"
+ c2ProtocolService.processHeartbeat(heartbeat1)
+
+ then: "the agent class is registered"
+ 1 * c2Service.getAgentClass(_ as String) >> Optional.empty()
+ 1 * c2Service.createAgentClass(_ as AgentClass)
+
+
+ when: "heartbeat contains existing agent class"
+ c2ProtocolService.processHeartbeat(heartbeat1)
+
+ then: "the existing agent class is update"
+ 1 * c2Service.getAgentClass(_ as String) >> {name -> Optional.of(new AgentClass([name: name])) }
+ 0 * c2Service.createAgentClass(_ as AgentClass)
+ 1 * c2Service.updateAgentClass(_ as AgentClass)
+
+
+ when: "heartbeat contains new device"
+ c2ProtocolService.processHeartbeat(heartbeat1)
+
+ then: "the device is registered"
+ 1 * c2Service.getDevice(_ as String) >> Optional.empty()
+ 1 * c2Service.createDevice(_ as Device)
+
+
+ when: "heartbeat contains existing device"
+ c2ProtocolService.processHeartbeat(heartbeat1)
+
+ then: "the device is not registered"
+ 1 * c2Service.getDevice(_ as String) >> { id -> Optional.of(new Device([identifier: id])) }
+ 0 * c2Service.createDevice(*_)
+
+ }
+
+ def "process ack"() {
+
+ setup:
+ C2OperationAck ack = new C2OperationAck([operationId: "operation1"])
+
+ when: "operationAck is processed"
+ c2ProtocolService.processOperationAck(ack)
+
+ then: "operation state is updated"
+ 1 * c2Service.updateOperationState("operation1", OperationState.DONE)
+
+ }
+
+
+ // --- Helper methods
+
+ def createTestHeartbeat(String agentId, String agentClass) {
+ return new C2Heartbeat([
+ identifier: "test-heartbeat-id",
+ timestamp: 1514764800000L,
+ deviceInfo: new DeviceInfo([
+ identifier: "test-device-id",
+ ]),
+ agentInfo: new AgentInfo([
+ identifier: agentId,
+ agentClass: agentClass,
+ agentManifest: new AgentManifest([
+ identifier: "test-agent-manifest-id"
+ ])
+ ]),
+ flowInfo: new FlowInfo([
+ flowId: "test-flow-id"
+ ])
+ ])
+ }
+
+ def createOperation(String targetAgentId, OperationState state) {
+ return new OperationRequest([
+ operation: new C2Operation([
+ identifier: "test-operation-id",
+ operation: OperationType.DESCRIBE
+ ]),
+ targetAgentIdentifier: targetAgentId,
+ state: state
+ ])
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ServiceSpec.groovy
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ServiceSpec.groovy b/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ServiceSpec.groovy
new file mode 100644
index 0000000..07c12a3
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ServiceSpec.groovy
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service
+
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentClassPersistenceProvider
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentManifestPersistenceProvider
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentPersistenceProvider
+import org.apache.nifi.minifi.c2.api.provider.device.DevicePersistenceProvider
+import org.apache.nifi.minifi.c2.api.provider.operations.OperationPersistenceProvider
+import org.apache.nifi.minifi.c2.core.exception.ResourceNotFoundException
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileAgentClassPersistenceProvider
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileAgentManifestPersistenceProvider
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileAgentPersistenceProvider
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileDevicePersistenceProvider
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileOperationPersistenceProvider
+import org.apache.nifi.minifi.c2.model.*
+import spock.lang.Specification
+
+import javax.validation.ConstraintViolationException
+import javax.validation.Validation
+
+class StandardC2ServiceSpec extends Specification{
+
+ AgentPersistenceProvider agentPersistenceProvider
+ AgentClassPersistenceProvider agentClassPersistenceProvider
+ AgentManifestPersistenceProvider agentManifestPersistenceProvider
+ DevicePersistenceProvider devicePersistenceProvider
+ OperationPersistenceProvider operationPersistenceProvider
+ C2Service c2Service
+
+ def setup() {
+ agentPersistenceProvider = new VolatileAgentPersistenceProvider()
+ agentClassPersistenceProvider = new VolatileAgentClassPersistenceProvider()
+ agentManifestPersistenceProvider = new VolatileAgentManifestPersistenceProvider()
+ devicePersistenceProvider = new VolatileDevicePersistenceProvider()
+ operationPersistenceProvider = new VolatileOperationPersistenceProvider()
+ def validator = Validation.buildDefaultValidatorFactory().getValidator()
+ c2Service = new StandardC2Service(
+ agentPersistenceProvider,
+ agentClassPersistenceProvider,
+ agentManifestPersistenceProvider,
+ devicePersistenceProvider,
+ operationPersistenceProvider,
+ validator)
+ }
+
+ //**********************************
+ //*** Agent Class CRUD methods ***
+ //**********************************
+
+ def "create agent class"() {
+
+ when: "arg is null"
+ c2Service.createAgentClass(null)
+
+ then: "exception is thrown"
+ thrown IllegalArgumentException
+
+
+ when: "class name is null"
+ c2Service.createAgentClass(new AgentClass())
+
+ then: "exception is thrown"
+ thrown ConstraintViolationException
+
+ when: "valid class is created"
+ def createdClass = c2Service.createAgentClass(new AgentClass([name: "myClass", description: "myDescription"]))
+
+ then: "created class is returned"
+ with(createdClass) {
+ name == "myClass"
+ description == "myDescription"
+ }
+
+ when: "class with same name already exists"
+ c2Service.createAgentClass(new AgentClass([name: "myDupeClass", description: "myDescription1"]))
+ c2Service.createAgentClass(new AgentClass([name: "myDupeClass", description: "myDescription2"]))
+
+ then: "exception is thrown"
+ thrown IllegalStateException
+
+ }
+
+ def "get agent classes"() {
+
+ setup:
+ agentClassPersistenceProvider.save(new AgentClass([name: "class1"]))
+ agentClassPersistenceProvider.save(new AgentClass([name: "class2"]))
+ agentClassPersistenceProvider.save(new AgentClass([name: "class3"]))
+
+ when:
+ def classes = c2Service.getAgentClasses()
+
+ then:
+ classes != null
+ classes.size() == 3
+
+ }
+
+ def "get agent class"() {
+
+ when: "class does not exist"
+ def ac1 = c2Service.getAgentClass("myClass")
+
+ then: "empty optional is returned"
+ !ac1.isPresent()
+
+
+ when: "class does exist"
+ agentClassPersistenceProvider.save(new AgentClass([name: "myClass"]))
+ def ac2 = c2Service.getAgentClass("myClass")
+
+ then: "class is returned"
+ ac2.isPresent()
+ with(ac2.get()) {
+ name == "myClass"
+ }
+
+ }
+
+ def "update agent class"() {
+
+ when: "class does not exist"
+ c2Service.updateAgentClass(new AgentClass([name: "myClass", description: "new description"]))
+
+ then:
+ thrown ResourceNotFoundException
+
+
+ when: "class exists"
+ agentClassPersistenceProvider.save(new AgentClass([name: "myClass"]))
+ def updatedClass = c2Service.updateAgentClass(new AgentClass([name: "myClass", description: "new description"]))
+
+ then:
+ with(updatedClass) {
+ name == "myClass"
+ description == "new description"
+ }
+
+ }
+
+ def "delete agent class"() {
+
+ when: "class does not exist"
+ c2Service.deleteAgent("myClass")
+
+ then:
+ thrown ResourceNotFoundException
+
+
+ when: "class exists"
+ agentClassPersistenceProvider.save(new AgentClass([name: "myClass"]))
+ def deletedClass = c2Service.deleteAgentClass("myClass")
+
+ then:
+ with(deletedClass) {
+ name == "myClass"
+ }
+
+ and: "class no longer exists in persistence provider"
+ agentClassPersistenceProvider.getCount() == 0
+
+ }
+
+
+ //*************************************
+ //*** Agent Manifest CRUD methods ***
+ //*************************************
+
+ def "create agent manifest"() {
+
+ when: "arg is null"
+ c2Service.createAgentManifest(null)
+
+ then: "exception is thrown"
+ thrown IllegalArgumentException
+
+
+ when: "valid manifest is created without client-set id"
+ def created = c2Service.createAgentManifest(new AgentManifest([agentType: "java"]))
+
+ then: "manifest is created and assigned an id"
+ with(created) {
+ identifier != null
+ }
+
+
+ when: "valid manifest is created with client-set id"
+ def created2 = c2Service.createAgentManifest(new AgentManifest([identifier: "manifest1", agentType: "java"]))
+
+ then: "manifest is created with client-set id"
+ with(created2) {
+ identifier == "manifest1"
+ }
+
+
+ when: "manifest is created with client-set id that already exists"
+ c2Service.createAgentManifest(new AgentManifest([identifier: "manifest1", agentType: "java"]))
+ c2Service.createAgentManifest(new AgentManifest([identifier: "manifest1", agentType: "cpp"]))
+
+ then: "exception is thrown"
+ thrown IllegalStateException
+
+ }
+
+ def "get agent manifests"() {
+
+ setup:
+ agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest1"]))
+ agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest2"]))
+ agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest3"]))
+
+ when:
+ def manifests = c2Service.getAgentManifests()
+
+ then:
+ manifests != null
+ manifests.size() == 3
+
+ }
+
+ def "get agent manifests by class name"() {
+
+ setup:
+ agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest1"]))
+ agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest2"]))
+ agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest3"]))
+ agentClassPersistenceProvider.save(new AgentClass([name: "myClass", agentManifests: ["manifest2"]]))
+
+ when:
+ def manifests = c2Service.getAgentManifests("myClass")
+
+ then:
+ manifests != null
+ manifests.size() == 1
+ manifests.get(0).getIdentifier() == "manifest2"
+
+ }
+
+ def "get agent manifest"() {
+
+ when: "manifest does not exist"
+ def manifest1 = c2Service.getAgentManifest("myManifest")
+
+ then: "empty optional is returned"
+ !manifest1.isPresent()
+
+
+ when: "manifest exists"
+ agentManifestPersistenceProvider.save(new AgentManifest([identifier: "myManifest"]))
+ def manifest2 = c2Service.getAgentManifest("myManifest")
+
+ then: "manifest is returned"
+ manifest2.isPresent()
+ with(manifest2.get()) {
+ identifier == "myManifest"
+ }
+
+ }
+
+ def "delete agent manifest"() {
+
+ when: "manifest does not exist"
+ c2Service.deleteAgentManifest("myManifest")
+
+ then: "empty optional is returned"
+ thrown ResourceNotFoundException
+
+
+ when: "manifest exists"
+ agentManifestPersistenceProvider.save(new AgentManifest([identifier: "myManifest"]))
+ def deleted = c2Service.deleteAgentManifest("myManifest")
+
+ then: "manifest is returned"
+ with(deleted) {
+ identifier == "myManifest"
+ }
+
+ and: "manifest no longer exists in persistence provider"
+ agentManifestPersistenceProvider.getCount() == 0
+
+ }
+
+
+ //****************************
+ //*** Agent CRUD methods ***
+ //****************************
+
+ def "create agent"() {
+
+ when: "arg is null"
+ c2Service.createAgent(null)
+
+ then: "exception is thrown"
+ thrown IllegalArgumentException
+
+
+ when: "valid agent is created"
+ def created = c2Service.createAgent(new Agent([identifier: "agent1"]))
+
+ then: "created agent is returned"
+ with(created) {
+ identifier == "agent1"
+ }
+
+
+ when: "agent is created with id that already exists"
+ c2Service.createAgent(new Agent([identifier: "agent1"]))
+ c2Service.createAgent(new Agent([identifier: "agent1"]))
+
+ then: "exception is thrown"
+ thrown IllegalStateException
+
+ }
+
+ def "get agents"() {
+
+ setup:
+ agentPersistenceProvider.save(new Agent([identifier: "agent1"]))
+ agentPersistenceProvider.save(new Agent([identifier: "agent2"]))
+ agentPersistenceProvider.save(new Agent([identifier: "agent3"]))
+
+ when:
+ def agents = c2Service.getAgents()
+
+ then:
+ agents.size() == 3
+
+ }
+
+ def "get agents by class name"() {
+
+ setup:
+ agentPersistenceProvider.save(new Agent([identifier: "agent1"]))
+ agentPersistenceProvider.save(new Agent([identifier: "agent2", agentClass: "myClass"]))
+ agentPersistenceProvider.save(new Agent([identifier: "agent3", agentClass: "yourClass"]))
+
+ when:
+ def agents = c2Service.getAgents("myClass")
+
+ then:
+ agents.size() == 1
+ agents.get(0).identifier == "agent2"
+
+ }
+
+ def "get agent"() {
+
+ when: "agent does not exist"
+ def agent1 = c2Service.getAgent("agent1")
+
+ then: "empty optional is returned"
+ !agent1.isPresent()
+
+
+ when: "agent exists"
+ agentPersistenceProvider.save(new Agent([identifier: "agent2"]))
+ def agent2 = c2Service.getAgent("agent2")
+
+ then: "agent is returned"
+ agent2.isPresent()
+ with(agent2.get()) {
+ identifier == "agent2"
+ }
+
+ }
+
+ def "update agent"() {
+
+ when: "agent does not exist"
+ c2Service.updateAgent(new Agent([identifier: "agent1", name: "a better agent"]))
+
+ then:
+ thrown ResourceNotFoundException
+
+
+ when: "agent exists"
+ agentPersistenceProvider.save(new Agent([identifier: "agent1"]))
+ def updated = c2Service.updateAgent(new Agent([identifier: "agent1", name: "a better agent"]))
+
+ then:
+ with(updated) {
+ identifier == "agent1"
+ name == "a better agent"
+ }
+
+ }
+
+ def "delete agent"() {
+
+ when: "agent does not exist"
+ c2Service.deleteAgent("agent1")
+
+ then:
+ thrown ResourceNotFoundException
+
+
+ when: "agent exists"
+ agentPersistenceProvider.save(new Agent([identifier: "agent1"]))
+ def deleted = c2Service.deleteAgent("agent1")
+
+ then:
+ with(deleted) {
+ identifier == "agent1"
+ }
+
+ }
+
+ //*****************************
+ //*** Device CRUD methods ***
+ //*****************************
+
+ def "create device"() {
+
+ when: "arg is null"
+ c2Service.createDevice(null)
+
+ then: "exception is thrown"
+ thrown IllegalArgumentException
+
+
+ when: "arg is invalid"
+ c2Service.createDevice(new Device())
+
+ then: "exception is thrown"
+ thrown ConstraintViolationException
+
+
+ when: "valid device is created"
+ def created = c2Service.createDevice(new Device([identifier: "device1"]))
+
+ then: "created device is returned"
+ with(created) {
+ identifier == "device1"
+ }
+
+
+ when: "device is created with existing id"
+ c2Service.createDevice(new Device([identifier: "device1"]))
+ c2Service.createDevice(new Device([identifier: "device2"]))
+
+ then: "exception is thrown"
+ thrown IllegalStateException
+
+ }
+
+ def "get devices"() {
+
+ setup:
+ devicePersistenceProvider.save(new Device([identifier: "device1"]))
+ devicePersistenceProvider.save(new Device([identifier: "device2"]))
+ devicePersistenceProvider.save(new Device([identifier: "device3"]))
+
+ when:
+ def devices = c2Service.getDevices()
+
+ then:
+ devices.size() == 3
+
+ }
+
+ def "get device"() {
+
+ when: "device does not exist"
+ def device1 = c2Service.getDevice("device1")
+
+ then: "empty optional is returned"
+ !device1.isPresent()
+
+
+ when: "device exists"
+ devicePersistenceProvider.save(new Device([identifier: "device2"]))
+ def device2 = c2Service.getDevice("device2")
+
+ then: "device is returned"
+ device2.isPresent()
+ with(device2.get()) {
+ identifier == "device2"
+ }
+
+ }
+
+ def "update device"() {
+
+ when: "device does not exist"
+ c2Service.updateDevice(new Device([identifier: "device1", name: "MiNiFi Device"]))
+
+ then:
+ thrown ResourceNotFoundException
+
+
+ when: "agent exists"
+ devicePersistenceProvider.save(new Device([identifier: "device1"]))
+ def updated = c2Service.updateDevice(new Device([identifier: "device1", name: "MiNiFi Device"]))
+
+ then:
+ with(updated) {
+ identifier == "device1"
+ name == "MiNiFi Device"
+ }
+
+ }
+
+ def "delete device"() {
+
+ when: "device does not exist"
+ c2Service.deleteDevice("device1")
+
+ then:
+ thrown ResourceNotFoundException
+
+
+ when: "agent exists"
+ devicePersistenceProvider.save(new Device([identifier: "device1"]))
+ def deleted = c2Service.deleteDevice("device1")
+
+ then:
+ with(deleted) {
+ identifier == "device1"
+ }
+
+ }
+
+
+ //***********************************
+ //*** C2 Operation CRUD methods ***
+ //***********************************
+
+ def "create operaton"() {
+
+ when: "arg is null"
+ c2Service.createOperation(null)
+
+ then: "exception is thrown"
+ thrown IllegalArgumentException
+
+
+ when: "arg is invalid"
+ c2Service.createOperation(new OperationRequest())
+
+ then: "exception is thrown"
+ thrown ConstraintViolationException
+
+
+ when: "valid operation is created"
+ def created = c2Service.createOperation(
+ new OperationRequest([
+ operation: new C2Operation([operation: OperationType.DESCRIBE]),
+ targetAgentIdentifier: "agent1",
+ state: OperationState.NEW
+ ]))
+
+ then: "created operation is returned, generated id"
+ with(created) {
+ targetAgentIdentifier == "agent1"
+ operation.getIdentifier() != null
+ }
+
+ }
+
+ def "get operations"() {
+
+ setup:
+ operationPersistenceProvider.save(new OperationRequest([
+ operation: new C2Operation([identifier: "operation1", operation: OperationType.DESCRIBE]),
+ targetAgentIdentifier: "agent1",
+ state: OperationState.NEW
+ ]))
+ operationPersistenceProvider.save(new OperationRequest([
+ operation: new C2Operation([identifier: "operation2", operation: OperationType.RESTART]),
+ targetAgentIdentifier: "agent2",
+ state: OperationState.NEW
+ ]))
+
+ when: "get all operations"
+ def operations = c2Service.getOperations()
+
+ then:
+ operations.size() == 2
+
+ when: "get operations for agent2"
+ operations = c2Service.getOperationsByAgent("agent2")
+
+ then:
+ operations.size() == 1
+ operations.get(0).operation.identifier == "operation2"
+
+ }
+
+ def "get operation"() {
+
+ when: "operation does not exist"
+ def operation1 = c2Service.getOperation("operation1")
+
+ then: "empty optional is returned"
+ !operation1.isPresent()
+
+
+ when: "operation exists"
+ operationPersistenceProvider.save(new OperationRequest([
+ operation: new C2Operation([identifier: "operation1", operation: OperationType.DESCRIBE]),
+ targetAgentIdentifier: "agent1",
+ state: OperationState.NEW
+ ]))
+ def operation2 = c2Service.getOperation("operation1")
+
+ then: "operation is returned"
+ operation2.isPresent()
+ with(operation2.get()) {
+ operation.identifier == "operation1"
+ }
+
+ }
+
+ def "update operation state"() {
+
+ when: "operation does not exist"
+ c2Service.updateOperationState("operation1", OperationState.DONE)
+
+ then:
+ thrown ResourceNotFoundException
+
+
+ when: "operation exists"
+ operationPersistenceProvider.save(new OperationRequest([
+ operation: new C2Operation([identifier: "operation1", operation: OperationType.DESCRIBE]),
+ targetAgentIdentifier: "agent1",
+ state: OperationState.NEW
+ ]))
+ def updated = c2Service.updateOperationState("operation1", OperationState.DONE)
+
+ then:
+ with(updated) {
+ state == OperationState.DONE
+ }
+
+ }
+
+ def "delete operation"() {
+
+ when: "operation does not exist"
+ c2Service.deleteOperation("operation1")
+
+ then:
+ thrown ResourceNotFoundException
+
+
+ when: "operation exists"
+ operationPersistenceProvider.save(new OperationRequest([
+ operation: new C2Operation([identifier: "operation1", operation: OperationType.DESCRIBE]),
+ targetAgentIdentifier: "agent1",
+ state: OperationState.NEW
+ ]))
+ def deleted = c2Service.deleteOperation("operation1")
+
+ then:
+ with(deleted) {
+ operation.identifier == "operation1"
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-jetty/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-jetty/pom.xml b/minifi-c2/minifi-c2-jetty/pom.xml
index 241b3bb..e76b296 100644
--- a/minifi-c2/minifi-c2-jetty/pom.xml
+++ b/minifi-c2/minifi-c2-jetty/pom.xml
@@ -32,6 +32,7 @@ limitations under the License.
<groupId>org.apache.nifi.minifi</groupId>
<artifactId>minifi-c2-commons</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-provider-api/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-provider-api/pom.xml b/minifi-c2/minifi-c2-provider-api/pom.xml
new file mode 100644
index 0000000..8755a93
--- /dev/null
+++ b/minifi-c2/minifi-c2-provider-api/pom.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>minifi-c2</artifactId>
+ <groupId>org.apache.nifi.minifi</groupId>
+ <version>0.5.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>minifi-c2-provider-api</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/Provider.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/Provider.java b/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/Provider.java
new file mode 100644
index 0000000..f2c5bbb
--- /dev/null
+++ b/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/Provider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.api.provider;
+
+/**
+ * Base interface for providers.
+ */
+public interface Provider {
+
+ /**
+ * Called prior to configuring the Provider in order to discover the prefix for the provider's properties.
+ *
+ * It is recommended that the prefix match the base package for the provider implementation
+ * classes, in order to ensure uniqueness. For example, if your provider uses the package
+ * com.mycompany.nifi-providers.example-provider, that package name can server as the
+ *
+ * If your properties prefix string collides with another configured provider at runtime,
+ * the server will throw a runtime exception and fail to start.
+ */
+ // String getPropertiesPrefix();
+ // ^^^ TODO, this is disabled as it was an experimental look at how to simplify extension provider configuration.
+ // The idea is to combine their config into minifi-c2.properties and allow them to get their config my defining their prefix.
+ // However I'm not sure it can work in all cases as it's not as powerful/flexible as JAXB.
+ // For simple, standalone providers, such as persistence provider, it's probably sufficient,
+ // but for more complex extensions, such as an Authorizers framework, it may be too limiting.
+
+ /**
+ * Called to configure the Provider.
+ *
+ * @param configurationContext the context containing configuration for the given provider
+ * @throws ProviderCreationException if an error occurs while the provider is configured
+ */
+ void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException;
+
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/ProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/ProviderConfigurationContext.java b/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/ProviderConfigurationContext.java
new file mode 100644
index 0000000..59f1364
--- /dev/null
+++ b/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/ProviderConfigurationContext.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.api.provider;
+
+import java.util.Map;
+
+/**
+ * A context that will passed to providers in order to obtain configuration.
+ */
+public interface ProviderConfigurationContext {
+
+ /**
+ * Retrieves all properties prefixed by provider's property prefix.
+ *
+ * @return Map of all properties
+ */
+ Map<String, String> getProperties();
+
+}