You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by kd...@apache.org on 2018/04/02 18:42:11 UTC

[2/3] nifi-minifi git commit: MINIFI-448 Adds C2 Server service layer

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileHeartbeatPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileHeartbeatPersistenceProvider.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileHeartbeatPersistenceProvider.java
new file mode 100644
index 0000000..16a2d1e
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileHeartbeatPersistenceProvider.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.provider.persistence;
+
+import org.apache.nifi.minifi.c2.api.provider.heartbeat.HeartbeatPersistenceProvider;
+import org.apache.nifi.minifi.c2.model.C2Heartbeat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * A simple, in-memory "persistence" provider in order to test the service layer.
+ *
+ * This is not designed for real use outside of development. For example:
+ *   - it only keeps an in-memory record of saved entities, there is no real persistence
+ *   - it does not support transactions
+ *   - it does not clone objects on save/retrieval, so any modifications made after interacting with this service
+ *     also modify the "persisted" copies.
+ *
+ * TODO, deep copy objects on save/get so that they cannot be modified outside this class without modifying the persisted copy.
+ */
+class VolatileHeartbeatPersistenceProvider implements HeartbeatPersistenceProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(VolatileHeartbeatPersistenceProvider.class);
+
+    private Map<String, C2Heartbeat> heartbeats = new ConcurrentHashMap<>();
+
+    @Override
+    public long getCount() {
+        return heartbeats.size();
+    }
+
+    @Override
+    public C2Heartbeat save(C2Heartbeat heartbeat) {
+
+        if (heartbeat == null || heartbeat.getIdentifier() == null) {
+            throw new IllegalArgumentException("Heartbeat must be not null and must have an id in order to be saved.");
+        }
+
+        // TODO, atomic transaction
+        heartbeats.put(heartbeat.getIdentifier(), heartbeat);
+
+        logger.debug("Saved heartbeat with id={}", heartbeat.getIdentifier());
+        return heartbeat;
+
+    }
+
+    @Override
+    public Iterable<C2Heartbeat> getAll() {
+        return new ArrayList<>(heartbeats.values());
+    }
+
+    @Override
+    public Iterable<C2Heartbeat> getByAgent(String agentId) {
+        if (agentId == null) {
+            throw new IllegalArgumentException("Agent id cannot be null");
+        }
+
+        return heartbeats.values().stream()
+                .filter(hb -> hb.getAgentInfo() != null)
+                .filter(hb -> agentId.equals(hb.getAgentInfo().getIdentifier()) )
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean existsById(String heartbeatId) {
+        if (heartbeatId == null) {
+            throw new IllegalArgumentException("Heartbeat id cannot be null");
+        }
+        return heartbeats.containsKey(heartbeatId);
+    }
+
+    @Override
+    public Optional<C2Heartbeat> getById(String heartbeatId) {
+        if (heartbeatId == null) {
+            throw new IllegalArgumentException("Heartbeat id cannot be null");
+        }
+        return Optional.ofNullable(heartbeats.get(heartbeatId));
+    }
+
+    @Override
+    public void deleteById(String heartbeatId) {
+        if (heartbeatId == null) {
+            throw new IllegalArgumentException("Heartbeat id cannot be null");
+        }
+        heartbeats.remove(heartbeatId);
+    }
+
+    @Override
+    public void delete(C2Heartbeat heartbeat) {
+        if (heartbeat == null || heartbeat.getIdentifier() == null) {
+            throw new IllegalArgumentException("heartbeat cannot be null and must have an id");
+        }
+        deleteById(heartbeat.getIdentifier());
+    }
+
+    @Override
+    public void deleteAll() {
+        heartbeats.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileOperationPersistenceProvider.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileOperationPersistenceProvider.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileOperationPersistenceProvider.java
new file mode 100644
index 0000000..fa74f77
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatileOperationPersistenceProvider.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.provider.persistence;
+
+import org.apache.nifi.minifi.c2.api.provider.operations.OperationPersistenceProvider;
+import org.apache.nifi.minifi.c2.model.OperationRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * A simple, in-memory "persistence" provider in order to test the service layer.
+ *
+ * This is not designed for real use outside of development. For example:
+ *   - it only keeps an in-memory record of saved entities, there is no real persistence
+ *   - it does not support transactions
+ *   - it does not clone objects on save/retrieval, so any modifications made after interacting with this service
+ *     also modify the "persisted" copies.
+ *
+ * TODO, deep copy objects on save/get so that they cannot be modified outside this class without modifying the persisted copy.
+ */
+public class VolatileOperationPersistenceProvider implements OperationPersistenceProvider {
+
+    private static final Logger logger = LoggerFactory.getLogger(VolatileOperationPersistenceProvider.class);
+
+    private Map<String, OperationRequest> operations = new ConcurrentHashMap<>();
+
+    @Override
+    public long getCount() {
+        return operations.size();
+    }
+
+
+    @Override
+    public OperationRequest save(OperationRequest operationRequest) {
+        if (operationRequest == null || operationRequest.getOperation() == null || operationRequest.getOperation().getIdentifier() == null) {
+            throw new IllegalArgumentException("operation must be not null and have id");
+        }
+        operations.put(operationRequest.getOperation().getIdentifier(), operationRequest);
+        return operationRequest;
+    }
+
+    @Override
+    public Iterable<OperationRequest> getAll() {
+        return new ArrayList<>(operations.values());
+    }
+
+    @Override
+    public Iterable<OperationRequest> getByAgent(String agentId) {
+        if (agentId == null) {
+            throw new IllegalArgumentException("Agent id cannot be null");
+        }
+        return operations.values().stream()
+                .filter(operation -> agentId.equals(operation.getTargetAgentIdentifier()))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean existsById(String operationId) {
+        if (operationId == null) {
+            throw new IllegalArgumentException("operationId cannot be null");
+        }
+        return operations.containsKey(operationId);
+    }
+
+    @Override
+    public Optional<OperationRequest> getById(String operationId) {
+        if (operationId == null) {
+            throw new IllegalArgumentException("operationId cannot be null");
+        }
+        return Optional.ofNullable(operations.get(operationId));
+    }
+
+    @Override
+    public void deleteById(String operationId) {
+        if (operationId == null) {
+            throw new IllegalArgumentException("operationId cannot be null");
+        }
+        operations.remove(operationId);
+    }
+
+    @Override
+    public void delete(OperationRequest operationRequest) {
+        if (operationRequest == null || operationRequest.getOperation() == null || operationRequest.getOperation().getIdentifier() == null) {
+            throw new IllegalArgumentException("operation must be not null and have id");
+        }
+        operations.remove(operationRequest.getOperation().getIdentifier());
+    }
+
+    @Override
+    public void deleteAll() {
+        operations.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatilePersistenceProviderFactory.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatilePersistenceProviderFactory.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatilePersistenceProviderFactory.java
new file mode 100644
index 0000000..95d584e
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/provider/persistence/VolatilePersistenceProviderFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.provider.persistence;
+
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentClassPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentManifestPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.device.DevicePersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.heartbeat.HeartbeatPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.operations.OperationPersistenceProvider;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * TODO, replace this with a factory that is externally configurable
+ */
+@Configuration
+public class VolatilePersistenceProviderFactory {
+
+    private final VolatileAgentPersistenceProvider agentPersistenceProvider;
+    private final VolatileAgentClassPersistenceProvider agentClassPersistenceProvider;
+    private final VolatileAgentManifestPersistenceProvider agentManifestPersistenceProvider;
+    private final VolatileDevicePersistenceProvider devicePersistenceProvider;
+    private final VolatileHeartbeatPersistenceProvider heartbeatPersistenceProvider;
+    private final VolatileOperationPersistenceProvider operationPersistenceProvider;
+
+    public VolatilePersistenceProviderFactory() {
+        agentPersistenceProvider = new VolatileAgentPersistenceProvider();
+        agentClassPersistenceProvider = new VolatileAgentClassPersistenceProvider();
+        agentManifestPersistenceProvider = new VolatileAgentManifestPersistenceProvider();
+        devicePersistenceProvider = new VolatileDevicePersistenceProvider();
+        heartbeatPersistenceProvider = new VolatileHeartbeatPersistenceProvider();
+        operationPersistenceProvider = new VolatileOperationPersistenceProvider();
+    }
+
+    @Bean
+    public DevicePersistenceProvider getDevicePersistenceProvider() {
+        return devicePersistenceProvider;
+    }
+
+    @Bean
+    public OperationPersistenceProvider getOperationPersistenceProvider() {
+        return operationPersistenceProvider;
+    }
+
+    @Bean
+    public HeartbeatPersistenceProvider getHeartBeatPersistenceProvider() {
+        return heartbeatPersistenceProvider;
+    }
+
+    @Bean
+    public AgentClassPersistenceProvider getAgentClassPersistenceProvider() {
+       return agentClassPersistenceProvider;
+    }
+
+    @Bean
+    public AgentManifestPersistenceProvider getAgentManifestPersistenceProvider() {
+        return agentManifestPersistenceProvider;
+    }
+
+    @Bean
+    public AgentPersistenceProvider getAgentPersistenceProvider() {
+        return agentPersistenceProvider;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2ProtocolService.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2ProtocolService.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2ProtocolService.java
new file mode 100644
index 0000000..54f5a3e
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2ProtocolService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service;
+
+import org.apache.nifi.minifi.c2.model.C2Heartbeat;
+import org.apache.nifi.minifi.c2.model.C2HeartbeatResponse;
+import org.apache.nifi.minifi.c2.model.C2OperationAck;
+
+public interface C2ProtocolService {
+
+    C2HeartbeatResponse processHeartbeat(C2Heartbeat heartbeat);
+
+    void processOperationAck(C2OperationAck operationAck);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java
index c0acb57..05c3f21 100644
--- a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/C2Service.java
@@ -5,9 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -16,44 +14,97 @@
  */
 package org.apache.nifi.minifi.c2.core.service;
 
-import org.apache.nifi.minifi.c2.core.persistence.C2Repository;
-import org.apache.nifi.minifi.c2.model.TestObject;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
+import org.apache.nifi.minifi.c2.model.Agent;
+import org.apache.nifi.minifi.c2.model.AgentClass;
+import org.apache.nifi.minifi.c2.model.AgentManifest;
+import org.apache.nifi.minifi.c2.model.Device;
+import org.apache.nifi.minifi.c2.model.OperationRequest;
+import org.apache.nifi.minifi.c2.model.OperationState;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+
+/**
+ *  Standard CRUD method semantics apply to these methods. That is:
+ *
+ *    - getWidgets: return List of Widget,
+ *                  or empty List if no Widgets exist
+ *
+ *    - getWidget(String): return Optional Widget with matching id,
+ *                         or empty Optional if no Widget with matching id exists
+ *
+ *    - createWidget(Widget): create Widget and assign it a generated id,
+ *                            return created widget (including any fields that got generated such as id or creation timestamp),
+ *                            throw IllegalStateException if Widget with matching id already exists
+ *                            throw IllegalArgumentException if Widget is not valid (e.g., missing required fields)
+ *
+ *    - updateWidget(Widget): update Widget with the id to match the incoming Widget
+ *                            return updated Widget
+ *                            throw IllegalArgumentException if Widget is not valid (e.g., missing required fields. Note, id is required when updating existing Widget)
+ *                            throw ResourceNotFoundException if no Widget with matching id exists
+ *
+ *    - deleteWidget(String): delete Widget with id,
+ *                            return Widget that was deleted,
+ *                            throw ResourceNotFoundException if no Widget with matching id exists
+ *
+ *  Any invalid arguments (eg, null where required) will result in an IllegalArgumentException
+ */
+public interface C2Service {
+
+    //**********************************
+    //***  Agent Class CRUD methods  ***
+    //**********************************
+
+    AgentClass createAgentClass(AgentClass agentClass);
+    List<AgentClass> getAgentClasses();
+    Optional<AgentClass> getAgentClass(String name);
+    AgentClass updateAgentClass(AgentClass agentClass);
+    AgentClass deleteAgentClass(String name);
+
+
+    //*************************************
+    //***  Agent Manifest CRUD methods  ***
+    //*************************************
+
+    AgentManifest createAgentManifest(AgentManifest agentManifest);
+    List<AgentManifest> getAgentManifests();
+    List<AgentManifest> getAgentManifests(String agentClassName);
+    Optional<AgentManifest> getAgentManifest(String manifestId);
+    AgentManifest deleteAgentManifest(String manifestId);
+
 
-@Service
-public class C2Service {
+    //****************************
+    //***  Agent CRUD methods  ***
+    //****************************
 
-    C2Repository c2Repository;
+    Agent createAgent(Agent agent);
+    List<Agent> getAgents();
+    List<Agent> getAgents(String agentClassName);
+    Optional<Agent> getAgent(String agentId);
+    Agent updateAgent(Agent agent);
+    Agent deleteAgent(String agentId);
 
-    @Autowired
-    public C2Service(C2Repository c2Repository) {
-        this.c2Repository = c2Repository;
-    }
 
-    public TestObject createTestObject(TestObject testObject) {
-        return c2Repository.createTestObject(testObject);
-    }
+    //*****************************
+    //***  Device CRUD methods  ***
+    //*****************************
 
-    public List<TestObject> getTestObjects() {
-        List<TestObject> objects = new ArrayList<>();
-        c2Repository.getTestObjects().forEachRemaining(objects::add);
-        return objects;
-    }
+    Device createDevice(Device device);
+    List<Device> getDevices();
+    Optional<Device> getDevice(String deviceId);
+    Device updateDevice(Device device);
+    Device deleteDevice(String deviceId);
 
-    public TestObject getTestObjectById(String identifier) {
-        return c2Repository.getTestObjectById(identifier);
-    }
 
-    public TestObject updateTestObject(TestObject testObject) {
-        return c2Repository.updateTestObject(testObject);
-    }
+    //***********************************
+    //***  C2 Operation CRUD methods  ***
+    //***********************************
 
-    public TestObject deleteTestObject(String identifier) {
-        return c2Repository.deleteTestObject(identifier);
-    }
+    OperationRequest createOperation(OperationRequest operationRequest);
+    List<OperationRequest> getOperations();
+    List<OperationRequest> getOperationsByAgent(String agentId);
+    Optional<OperationRequest> getOperation(String operationId);
+    OperationRequest updateOperationState(String operationId, OperationState state);
+    OperationRequest deleteOperation(String operationId);
 
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolService.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolService.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolService.java
new file mode 100644
index 0000000..60e56b9
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolService.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service;
+
+import org.apache.nifi.minifi.c2.api.provider.heartbeat.HeartbeatPersistenceProvider;
+import org.apache.nifi.minifi.c2.model.Agent;
+import org.apache.nifi.minifi.c2.model.AgentClass;
+import org.apache.nifi.minifi.c2.model.AgentInfo;
+import org.apache.nifi.minifi.c2.model.C2Heartbeat;
+import org.apache.nifi.minifi.c2.model.C2HeartbeatResponse;
+import org.apache.nifi.minifi.c2.model.C2Operation;
+import org.apache.nifi.minifi.c2.model.C2OperationAck;
+import org.apache.nifi.minifi.c2.model.Device;
+import org.apache.nifi.minifi.c2.model.DeviceInfo;
+import org.apache.nifi.minifi.c2.model.OperationRequest;
+import org.apache.nifi.minifi.c2.model.OperationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+@Service
+public class StandardC2ProtocolService implements C2ProtocolService {
+
+    private static final Logger logger = LoggerFactory.getLogger(StandardC2ProtocolService.class);
+
+    private C2Service c2Service;
+    private HeartbeatPersistenceProvider heartbeatPersistenceProvider;
+
+
+    @Autowired
+    public StandardC2ProtocolService(
+            C2Service c2Service,
+            HeartbeatPersistenceProvider heartbeatPersistenceProvider) {
+        this.c2Service = c2Service;
+        this.heartbeatPersistenceProvider = heartbeatPersistenceProvider;
+    }
+
+    @Override
+    public C2HeartbeatResponse processHeartbeat(C2Heartbeat heartbeat) {
+
+        if (heartbeat == null) {
+            throw new IllegalArgumentException("Heartbeat cannot be null");
+        }
+
+        heartbeat.setTimestamp(System.currentTimeMillis());
+        heartbeat.setIdentifier(UUID.randomUUID().toString());
+
+        logger.info("Processing heartbeat: {}", heartbeat.toString());
+
+        persistHeartbeat(heartbeat);
+        processHeartbeatDeviceInfo(heartbeat);
+        processHeartbeatAgentInfo(heartbeat);
+
+        C2HeartbeatResponse response = new C2HeartbeatResponse();
+        List<C2Operation> requestedOperations = new ArrayList<>(getQueuedC2Operations(heartbeat));
+        // TODO, detect if NiFi Registry integration is configured,
+        // and if so, call Flow Retrieval Service to detect if we need to add a Flow Update operation
+        if (!requestedOperations.isEmpty()) {
+            response.setRequestedOperations(requestedOperations);
+            for (C2Operation op : requestedOperations) {
+                try {
+                    c2Service.updateOperationState(op.getIdentifier(), OperationState.DEPLOYED);
+                } catch (Exception e) {
+                    logger.warn("Encountered exception while updating operation state", e);
+                }
+            }
+        }
+
+        return response;
+    }
+
+    @Override
+    public void processOperationAck(C2OperationAck operationAck) {
+
+        try {
+            // TODO, add operation status (eg success/failed) to operationAck. For now, assume ack indicates successful execution.
+            c2Service.updateOperationState(operationAck.getOperationId(), OperationState.DONE);
+        } catch (Exception e) {
+            logger.warn("Encountered exception while processing operation ack", e);
+        }
+
+    }
+
+    private void persistHeartbeat(C2Heartbeat heartbeat) {
+        try {
+            heartbeatPersistenceProvider.save(heartbeat);
+        } catch (Exception e) {
+            logger.warn("Encountered exception while trying to record heartbeat", e);
+        }
+    }
+
+    private void processHeartbeatDeviceInfo(C2Heartbeat heartbeat) {
+        try {
+            final String deviceIdentifier;
+            final DeviceInfo deviceInfo = heartbeat.getDeviceInfo();
+            if (deviceInfo != null) {
+                deviceIdentifier = deviceInfo.getIdentifier();
+
+                if (deviceIdentifier == null) {
+                    logger.info("Could not register device without identifier: {} ", deviceInfo);
+                    return;
+                }
+
+                logger.debug("Creating/updating device info for deviceId={}", deviceIdentifier);
+                Optional<Device> existingDevice = c2Service.getDevice(deviceIdentifier);
+                boolean deviceExists = (existingDevice.isPresent());
+                Device device = existingDevice.orElse(new Device());
+                if (!deviceExists) {
+                    device.setIdentifier(deviceIdentifier);
+                    device.setFirstSeen(heartbeat.getTimestamp());
+                }
+                device.setLastSeen(heartbeat.getTimestamp());
+                device.setSystemInfo(deviceInfo.getSystemInfo());
+                device.setNetworkInfo(deviceInfo.getNetworkInfo());
+
+                if (!deviceExists) {
+                    c2Service.createDevice(device);
+                } else {
+                    c2Service.updateDevice(device);
+                }
+            }
+        } catch (Exception e) {
+            logger.warn("Encountered exception while trying to update device info", e);
+        }
+    }
+
+    private void processHeartbeatAgentInfo(C2Heartbeat heartbeat) {
+        try {
+            final String agentIdentifier;
+            final AgentInfo agentInfo = heartbeat.getAgentInfo();
+            if (agentInfo != null) {
+                agentIdentifier = agentInfo.getIdentifier();
+
+                if (agentIdentifier == null) {
+                    logger.info("Could not register agent without identifier: {} ", agentInfo);
+                    return;
+                }
+
+                logger.debug("Creating/updating agent info for agentId={}", agentIdentifier);
+                Optional<Agent> existingAgent = c2Service.getAgent(agentIdentifier);
+                boolean agentExists = existingAgent.isPresent();
+                final Agent agent = existingAgent.orElse(new Agent());
+                if (!agentExists) {
+                    agent.setIdentifier(agentIdentifier);
+                    agent.setFirstSeen(heartbeat.getTimestamp());
+                }
+                agent.setLastSeen(heartbeat.getTimestamp());
+                if (agentInfo.getAgentClass() != null) {
+                    agent.setAgentClass(agentInfo.getAgentClass());
+                }
+                if (agentInfo.getAgentManifest() != null) {
+                    agent.setAgentManifest(agentInfo.getAgentManifest());
+                }
+                if (agentInfo.getStatus() != null) {
+                    agent.setStatus(agentInfo.getStatus());
+                }
+
+                // Create agent manifest if this is the first time we've seen it
+                String agentManifestIdentifier = null;
+                if (agentInfo.getAgentManifest() != null) {
+                    agent.setAgentManifest(agentInfo.getAgentManifest());
+                    agentManifestIdentifier = agent.getAgentManifest().getIdentifier();
+
+                    // Note, a client-set agent manifest identifier is required so that we don't create infinite manifests
+                    // Alternatively, we need some way of deterministically generating a manifest id from the manifest contents,
+                    // So that two heartbeats with a matching agent manifest do not register separate manifests
+                    if (!c2Service.getAgentManifest(agentManifestIdentifier).isPresent()) {
+                        c2Service.createAgentManifest(agent.getAgentManifest());
+                    }
+                }
+
+                // Create agent class if this is the first time we've seen it
+                if (agentInfo.getAgentClass() != null) {
+                    agent.setAgentClass(agentInfo.getAgentClass());
+                    Optional<AgentClass> existingAgentClass = c2Service.getAgentClass(agent.getAgentClass());
+                    if (existingAgentClass.isPresent()) {
+                        if (agentManifestIdentifier != null) {
+                            AgentClass updatedAgentClass = existingAgentClass.get();
+                            Set<String> agentManifests = updatedAgentClass.getAgentManifests();
+                            if (agentManifests == null) {
+                                agentManifests = new HashSet<>();
+                            }
+                            agentManifests.add(agentManifestIdentifier);
+                            c2Service.updateAgentClass(updatedAgentClass);
+                        }
+                    } else {
+                        AgentClass newAgentClass = new AgentClass();
+                        newAgentClass.setName(agent.getAgentClass());
+                        newAgentClass.setAgentManifests(agentManifestIdentifier != null ? new HashSet<>(Collections.singleton(agentManifestIdentifier)) : null);
+                        c2Service.createAgentClass(newAgentClass);
+                    }
+                }
+
+                if (!agentExists) {
+                    c2Service.createAgent(agent);
+                } else {
+                    c2Service.updateAgent(agent);
+                }
+            }
+        } catch (Exception e) {
+            logger.warn("Encountered exception while trying to update agent info", e);
+        }
+    }
+
+    private List<C2Operation> getQueuedC2Operations(C2Heartbeat heartbeat) {
+
+        final List<C2Operation> queuedC2Operations = new ArrayList<>();
+        if (heartbeat.getAgentInfo() != null) {
+            final String agentIdentifier = heartbeat.getAgentInfo().getIdentifier();
+            if (agentIdentifier != null) {
+                for (OperationRequest op : c2Service.getOperationsByAgent(agentIdentifier)) {
+                    if (op.getState() == OperationState.QUEUED) {
+                        queuedC2Operations.add(op.getOperation());
+                    }
+                }
+            }
+        }
+        return queuedC2Operations;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2Service.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2Service.java b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2Service.java
new file mode 100644
index 0000000..31d070d
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/main/java/org/apache/nifi/minifi/c2/core/service/StandardC2Service.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service;
+
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentClassPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentManifestPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentPersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.device.DevicePersistenceProvider;
+import org.apache.nifi.minifi.c2.api.provider.operations.OperationPersistenceProvider;
+import org.apache.nifi.minifi.c2.core.exception.ResourceNotFoundException;
+import org.apache.nifi.minifi.c2.model.Agent;
+import org.apache.nifi.minifi.c2.model.AgentClass;
+import org.apache.nifi.minifi.c2.model.AgentManifest;
+import org.apache.nifi.minifi.c2.model.Device;
+import org.apache.nifi.minifi.c2.model.OperationRequest;
+import org.apache.nifi.minifi.c2.model.OperationState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.validation.ConstraintViolation;
+import javax.validation.ConstraintViolationException;
+import javax.validation.Validator;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+@Service
+public class StandardC2Service implements C2Service {
+
+    private static final Logger logger = LoggerFactory.getLogger(StandardC2Service.class);
+
+    private final AgentPersistenceProvider agentPersistenceProvider;
+    private final AgentClassPersistenceProvider agentClassPersistenceProvider;
+    private final AgentManifestPersistenceProvider agentManifestPersistenceProvider;
+    private final DevicePersistenceProvider devicePersistenceProvider;
+    private final OperationPersistenceProvider operationPersistenceProvider;
+    private final Validator validator;
+
+    private final Lock agentLock = new ReentrantLock();
+    private final Lock agentClassLock = new ReentrantLock();
+    private final Lock agentManifestLock = new ReentrantLock();
+    private final Lock deviceLock = new ReentrantLock();
+    private final Lock operationLock = new ReentrantLock();
+
+    @Autowired
+    public StandardC2Service(
+            final AgentPersistenceProvider agentPersistenceProviderFactory,
+            final AgentClassPersistenceProvider agentClassPersistenceProvider,
+            final AgentManifestPersistenceProvider agentManifestPersistenceProvider,
+            final DevicePersistenceProvider devicePersistenceProvider,
+            final OperationPersistenceProvider operationPersistenceProvider,
+            final Validator validator) {
+        this.agentPersistenceProvider = agentPersistenceProviderFactory;
+        this.agentClassPersistenceProvider = agentClassPersistenceProvider;
+        this.agentManifestPersistenceProvider = agentManifestPersistenceProvider;
+        this.devicePersistenceProvider = devicePersistenceProvider;
+        this.operationPersistenceProvider = operationPersistenceProvider;
+        this.validator = validator;
+    }
+
+    private <T> void validate(T t, String invalidMessage) {
+        if (t == null) {
+            throw new IllegalArgumentException(invalidMessage + ". Object cannot be null");
+        }
+
+        final Set<ConstraintViolation<T>> violations = validator.validate(t);
+        if (violations.size() > 0) {
+            throw new ConstraintViolationException(invalidMessage, violations);
+        }
+    }
+
+    //**********************************
+    //***  Agent Class CRUD methods  ***
+    //**********************************
+
+    @Override
+    public List<AgentClass> getAgentClasses() {
+        return iterableToList(agentClassPersistenceProvider.getAll());
+    }
+
+    @Override
+    public AgentClass createAgentClass(AgentClass agentClass) {
+        validate(agentClass, "Cannot create agent class");
+
+        agentClassLock.lock();
+        try {
+            if (agentClassPersistenceProvider.existsById(agentClass.getName())) {
+                throw new IllegalStateException(
+                        String.format("Agent class not found with name='%s'", agentClass.getName()));
+            }
+            return agentClassPersistenceProvider.save(agentClass);
+        } finally {
+            agentClassLock.unlock();
+        }
+    }
+
+    @Override
+    public Optional<AgentClass> getAgentClass(String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("Name cannot be null");
+        }
+
+        return agentClassPersistenceProvider.getById(name);
+    }
+
+    @Override
+    public AgentClass updateAgentClass(AgentClass agentClass) {
+        validate(agentClass, "Cannot update agent class");
+        agentClassLock.lock();
+        try {
+            if (!agentClassPersistenceProvider.existsById(agentClass.getName())) {
+                throw new ResourceNotFoundException(
+                        String.format("Agent class not found with name='%s'", agentClass.getName()));
+            }
+            return agentClassPersistenceProvider.save(agentClass);
+        } finally {
+            agentClassLock.unlock();
+        }
+    }
+
+    @Override
+    public AgentClass deleteAgentClass(String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("Name cannot be null");
+        }
+        agentClassLock.lock();
+        try {
+            Optional<AgentClass> deletedAgentClass = agentClassPersistenceProvider.getById(name);
+            if (!deletedAgentClass.isPresent()) {
+                throw new ResourceNotFoundException(
+                        String.format("Agent class not found with name='%s'", name));
+            }
+            agentClassPersistenceProvider.deleteById(name);
+            return deletedAgentClass.get();
+        } finally {
+            agentClassLock.unlock();
+        }
+    }
+
+
+    //*************************************
+    //***  Agent Manifest CRUD methods  ***
+    //*************************************
+
+    @Override
+    public AgentManifest createAgentManifest(AgentManifest agentManifest) {
+        validate(agentManifest, "Could not create agent manifest");
+        if (agentManifest.getIdentifier() == null) {
+            agentManifest.setIdentifier(UUID.randomUUID().toString());
+        }
+
+        agentManifestLock.lock();
+        try {
+            if (agentManifestPersistenceProvider.existsById(agentManifest.getIdentifier())) {
+                throw new IllegalStateException(
+                        String.format("Agent manifest already exists with identifier='%s", agentManifest.getIdentifier()));
+            }
+            return agentManifestPersistenceProvider.save(agentManifest);
+        } finally {
+            agentManifestLock.unlock();
+        }
+    }
+
+    @Override
+    public List<AgentManifest> getAgentManifests() {
+        return iterableToList(agentManifestPersistenceProvider.getAll());
+    }
+
+    @Override
+    public List<AgentManifest> getAgentManifests(String agentClassName) {
+        if (agentClassName == null) {
+            throw new IllegalArgumentException("Agent class name cannot be null");
+        }
+
+        Optional<AgentClass> agentClass = agentClassPersistenceProvider.getById(agentClassName);
+        if (agentClass.isPresent()) {
+            Iterable<String> manifestIds = agentClass.get().getAgentManifests();
+            if (manifestIds != null) {
+                return iterableToList(agentManifestPersistenceProvider.getAllById(manifestIds));
+            }
+        }
+        return Collections.emptyList();
+    }
+
+    @Override
+    public Optional<AgentManifest> getAgentManifest(String manifestId) {
+        if (manifestId == null) {
+            throw new IllegalArgumentException("Agent manifest id must not be null");
+        }
+
+        return agentManifestPersistenceProvider.getById(manifestId);
+    }
+
+    @Override
+    public AgentManifest deleteAgentManifest(String manifestId) {
+        if (manifestId == null) {
+            throw new IllegalArgumentException("Agent manifest id must not be null");
+        }
+
+        agentManifestLock.lock();
+        try {
+            final Optional<AgentManifest> deletedAgentManifest = agentManifestPersistenceProvider.getById(manifestId);
+            if (!deletedAgentManifest.isPresent()) {
+                throw new ResourceNotFoundException(
+                        String.format("Agent manifest with id '%s' not found.", manifestId));
+            }
+            agentManifestPersistenceProvider.deleteById(manifestId);
+            return deletedAgentManifest.get();
+        } finally {
+            agentManifestLock.unlock();
+        }
+    }
+
+
+    //****************************
+    //***  Agent CRUD methods  ***
+    //****************************
+
+    @Override
+    public Agent createAgent(Agent agent) {
+        validate(agent, "Cannot create agent");
+        agentLock.lock();
+        try {
+            if (agentPersistenceProvider.existsById(agent.getIdentifier())) {
+                throw new IllegalStateException(
+                        String.format("Agent not found with identifier='%s'", agent.getIdentifier()));
+            }
+            return agentPersistenceProvider.save(agent);
+        } finally {
+            agentLock.unlock();
+        }
+    }
+
+    @Override
+    public List<Agent> getAgents() {
+        return iterableToList(agentPersistenceProvider.getAll());
+    }
+
+    @Override
+    public List<Agent> getAgents(String agentClassName) {
+        if (agentClassName == null) {
+            throw new IllegalArgumentException("agentClassName cannot be null");
+        }
+
+        return iterableToList(agentPersistenceProvider.getByClassName(agentClassName));
+    }
+
+    @Override
+    public Optional<Agent> getAgent(String agentId) {
+        if (agentId == null) {
+            throw new IllegalArgumentException("agentId cannot be null");
+        }
+
+        return agentPersistenceProvider.getById(agentId);
+    }
+
+    @Override
+    public Agent updateAgent(Agent agent) {
+        validate(agent, "Cannot update agent");
+
+        agentLock.lock();
+        try {
+            final Optional<Agent> oldAgent = agentPersistenceProvider.getById(agent.getIdentifier());
+            if (!oldAgent.isPresent()) {
+                throw new ResourceNotFoundException("Agent not found with id " + agent.getIdentifier());
+            }
+            agent.setFirstSeen(oldAgent.get().getFirstSeen());  // firstSeen timestamp is immutable
+            return agentPersistenceProvider.save(agent);
+        } finally {
+            agentLock.unlock();
+        }
+    }
+
+    @Override
+    public Agent deleteAgent(String agentId) {
+        if (agentId == null) {
+            throw new IllegalArgumentException("agentId cannot be null");
+        }
+
+        agentLock.lock();
+        try {
+            final Optional<Agent> deletedAgent = agentPersistenceProvider.getById(agentId);
+            if (!deletedAgent.isPresent()) {
+                throw new ResourceNotFoundException("Agent not found with id " + agentId);
+            }
+            agentPersistenceProvider.deleteById(agentId);
+            return deletedAgent.get();
+        } finally {
+            agentLock.unlock();
+        }
+    }
+
+
+    //*****************************
+    //***  Device CRUD methods  ***
+    //*****************************
+
+    @Override
+    public Device createDevice(Device device) {
+        validate(device, "Cannot create device");
+        deviceLock.lock();
+        try {
+            if (devicePersistenceProvider.existsById(device.getIdentifier())) {
+                throw new IllegalStateException(
+                        String.format("Device already exists with id='%s", device.getIdentifier()));
+            }
+            return devicePersistenceProvider.save(device);
+        } finally {
+            deviceLock.unlock();
+        }
+    }
+
+    @Override
+    public List<Device> getDevices() {
+        return iterableToList(devicePersistenceProvider.getAll());
+    }
+
+    @Override
+    public Optional<Device> getDevice(String deviceId) {
+        if (deviceId == null) {
+            throw new IllegalArgumentException("devicId cannot be null");
+        }
+
+        return devicePersistenceProvider.getById(deviceId);
+    }
+
+    @Override
+    public Device updateDevice(Device device) {
+        validate(device, "Cannot update device");
+
+        deviceLock.lock();
+        try {
+            final Optional<Device> oldDevice = devicePersistenceProvider.getById(device.getIdentifier());
+            if (!oldDevice.isPresent()) {
+                throw new ResourceNotFoundException("Device not found with id " + device.getIdentifier());
+            }
+            device.setFirstSeen(oldDevice.get().getFirstSeen());  // firstSeen timestamp is immutable
+            return devicePersistenceProvider.save(device);
+        } finally {
+            deviceLock.unlock();
+        }
+    }
+
+    @Override
+    public Device deleteDevice(String deviceId) {
+        if (deviceId == null) {
+            throw new IllegalArgumentException("devicId cannot be null");
+        }
+
+        deviceLock.lock();
+        try {
+            final Optional<Device> deletedDevice = devicePersistenceProvider.getById(deviceId);
+            if (!deletedDevice.isPresent()) {
+                throw new ResourceNotFoundException("Device not found with id " + deviceId);
+            }
+            devicePersistenceProvider.deleteById(deviceId);
+            return deletedDevice.get();
+        } finally {
+            deviceLock.unlock();
+        }
+    }
+
+
+    //***********************************
+    //***  C2 Operation CRUD methods  ***
+    //***********************************
+
+    @Override
+    public OperationRequest createOperation(OperationRequest operationRequest) {
+        validate(operationRequest, "Cannot create operation");
+        operationRequest.getOperation().setIdentifier(UUID.randomUUID().toString());
+
+        return operationPersistenceProvider.save(operationRequest);
+    }
+
+    @Override
+    public List<OperationRequest> getOperations() {
+        return iterableToList(operationPersistenceProvider.getAll());
+    }
+
+    @Override
+    public List<OperationRequest> getOperationsByAgent(String agentId) {
+        if (agentId == null) {
+            throw new IllegalArgumentException("agentId cannot be null");
+        }
+
+        return iterableToList(operationPersistenceProvider.getByAgent(agentId));
+    }
+
+    @Override
+    public Optional<OperationRequest> getOperation(String operationId) {
+        if (operationId == null) {
+            throw new IllegalArgumentException("operationId cannot be null");
+        }
+
+        return operationPersistenceProvider.getById(operationId);
+    }
+
+    @Override
+    public OperationRequest updateOperationState(String operationId, OperationState state) {
+        if (operationId == null) {
+            throw new IllegalArgumentException("operationId cannot be null");
+        }
+        if (state == null) {
+            throw new IllegalArgumentException("state cannot be null");
+        }
+
+        operationLock.lock();
+        try {
+            final Optional<OperationRequest> existingOperation = operationPersistenceProvider.getById(operationId);
+            if (!existingOperation.isPresent()) {
+                throw new ResourceNotFoundException("Operation not found with id " + operationId);
+            }
+            final OperationRequest updatedOperation = existingOperation.get();
+            logger.debug("C2 operation state transition for operationId={}, fromState={}, toState={}", operationId, updatedOperation.getState(), state);
+            updatedOperation.setState(state);
+            return operationPersistenceProvider.save(updatedOperation);
+        } finally {
+            operationLock.unlock();
+        }
+    }
+
+    @Override
+    public OperationRequest deleteOperation(String operationId) {
+        if (operationId == null) {
+            throw new IllegalArgumentException("operationId cannot be null");
+        }
+
+        operationLock.lock();
+        try {
+            final Optional<OperationRequest> deletedOperation = operationPersistenceProvider.getById(operationId);
+            if (!deletedOperation.isPresent()) {
+                throw new ResourceNotFoundException("Operation not found with id " + operationId);
+            }
+            operationPersistenceProvider.deleteById(operationId);
+            return deletedOperation.get();
+        } finally {
+            operationLock.unlock();
+        }
+    }
+
+    private static <T> List<T> iterableToList(Iterable<T> iterable) {
+        final List<T> retList;
+        if (iterable instanceof List) {
+           retList = (List<T>)iterable;
+        } else if (iterable instanceof Collection) {
+            retList = new ArrayList<>((Collection<T>)iterable);
+        } else {
+            retList = new ArrayList<>();
+            iterable.forEach(retList::add);
+        }
+        return retList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolServiceSpec.groovy
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolServiceSpec.groovy b/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolServiceSpec.groovy
new file mode 100644
index 0000000..9903068
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ProtocolServiceSpec.groovy
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service
+
+import org.apache.nifi.minifi.c2.api.provider.heartbeat.HeartbeatPersistenceProvider
+import org.apache.nifi.minifi.c2.model.*
+import spock.lang.Specification
+
+class StandardC2ProtocolServiceSpec extends Specification {
+
+    def c2Service = Mock(C2Service)
+    def heartbeatPersistenceProvider = Mock(HeartbeatPersistenceProvider)
+    C2ProtocolService c2ProtocolService
+
+    def setup() {
+        c2ProtocolService = new StandardC2ProtocolService(c2Service, heartbeatPersistenceProvider)
+    }
+
+    def "process heartbeat"() {
+
+        setup:
+        C2Heartbeat heartbeat1 = createTestHeartbeat("agent1", "agentClass1")
+        c2Service.getOperationsByAgent("agent1") >> Collections.emptyList()
+
+        C2Heartbeat heartbeat2 = createTestHeartbeat("agent2", "agentClass2")
+        c2Service.getOperationsByAgent("agent2") >> Collections.singletonList(createOperation("agent2", OperationState.QUEUED))
+
+        c2Service.getAgent(_ as String) >> Optional.empty()
+        c2Service.getAgentManifest(_ as String) >> Optional.empty()
+        c2Service.getAgentClass(_ as String) >> Optional.empty()
+        c2Service.getDevice(_ as String) >> Optional.empty()
+
+
+        when: "heartbeat is processed while no operations are queued"
+        def hbResponse1 = c2ProtocolService.processHeartbeat(heartbeat1)
+
+        then: "empty heartbeat response is generated"
+        with(hbResponse1) {
+            requestedOperations == null
+        }
+
+
+        when: "heartbeat is processed while operations are queued"
+        def hbResponse2 = c2ProtocolService.processHeartbeat(heartbeat2)
+
+        then: "heartbeat response is generated with requested operations"
+        with(hbResponse2) {
+            requestedOperations.size() == 1
+            requestedOperations.get(0).operation == OperationType.DESCRIBE
+        }
+
+
+        when: "heartbeat contains new agent manifest"
+        c2ProtocolService.processHeartbeat(heartbeat1)
+
+        then: "the agent manifest is registered"
+        1 * c2Service.getAgentManifest(_ as String) >> Optional.empty()
+        1 * c2Service.createAgentManifest(_ as AgentManifest)
+
+
+        when: "heartbeat contains existing agent manifest"
+        c2ProtocolService.processHeartbeat(heartbeat1)
+
+        then: "the agent manifest is not registered"
+        1 * c2Service.getAgentManifest(_ as String) >> { id -> Optional.of(new AgentManifest([identifier: id])) }
+        0 * c2Service.createAgentManifest(*_)
+
+
+        when: "heartbeat contains new agent class"
+        c2ProtocolService.processHeartbeat(heartbeat1)
+
+        then: "the agent class is registered"
+        1 * c2Service.getAgentClass(_ as String) >> Optional.empty()
+        1 * c2Service.createAgentClass(_ as AgentClass)
+
+
+        when: "heartbeat contains existing agent class"
+        c2ProtocolService.processHeartbeat(heartbeat1)
+
+        then: "the existing agent class is update"
+        1 * c2Service.getAgentClass(_ as String) >> {name -> Optional.of(new AgentClass([name: name])) }
+        0 * c2Service.createAgentClass(_ as AgentClass)
+        1 * c2Service.updateAgentClass(_ as AgentClass)
+
+
+        when: "heartbeat contains new device"
+        c2ProtocolService.processHeartbeat(heartbeat1)
+
+        then: "the device is registered"
+        1 * c2Service.getDevice(_ as String) >> Optional.empty()
+        1 * c2Service.createDevice(_ as Device)
+
+
+        when: "heartbeat contains existing device"
+        c2ProtocolService.processHeartbeat(heartbeat1)
+
+        then: "the device is not registered"
+        1 * c2Service.getDevice(_ as String) >> { id -> Optional.of(new Device([identifier: id])) }
+        0 * c2Service.createDevice(*_)
+
+    }
+
+    def "process ack"() {
+
+        setup:
+        C2OperationAck ack = new C2OperationAck([operationId: "operation1"])
+
+        when: "operationAck is processed"
+        c2ProtocolService.processOperationAck(ack)
+
+        then: "operation state is updated"
+        1 * c2Service.updateOperationState("operation1", OperationState.DONE)
+
+    }
+
+
+        // --- Helper methods
+
+    def createTestHeartbeat(String agentId, String agentClass) {
+        return new C2Heartbeat([
+                identifier: "test-heartbeat-id",
+                timestamp: 1514764800000L,
+                deviceInfo: new DeviceInfo([
+                        identifier: "test-device-id",
+                ]),
+                agentInfo: new AgentInfo([
+                        identifier: agentId,
+                        agentClass: agentClass,
+                        agentManifest: new AgentManifest([
+                                identifier: "test-agent-manifest-id"
+                        ])
+                ]),
+                flowInfo: new FlowInfo([
+                        flowId: "test-flow-id"
+                ])
+        ])
+    }
+
+    def createOperation(String targetAgentId, OperationState state) {
+        return new OperationRequest([
+                operation: new C2Operation([
+                        identifier: "test-operation-id",
+                        operation: OperationType.DESCRIBE
+                ]),
+                targetAgentIdentifier: targetAgentId,
+                state: state
+        ])
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ServiceSpec.groovy
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ServiceSpec.groovy b/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ServiceSpec.groovy
new file mode 100644
index 0000000..07c12a3
--- /dev/null
+++ b/minifi-c2/minifi-c2-framework/src/test/groovy/org/apache/nifi/minifi/c2/core/service/StandardC2ServiceSpec.groovy
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.core.service
+
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentClassPersistenceProvider
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentManifestPersistenceProvider
+import org.apache.nifi.minifi.c2.api.provider.agent.AgentPersistenceProvider
+import org.apache.nifi.minifi.c2.api.provider.device.DevicePersistenceProvider
+import org.apache.nifi.minifi.c2.api.provider.operations.OperationPersistenceProvider
+import org.apache.nifi.minifi.c2.core.exception.ResourceNotFoundException
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileAgentClassPersistenceProvider
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileAgentManifestPersistenceProvider
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileAgentPersistenceProvider
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileDevicePersistenceProvider
+import org.apache.nifi.minifi.c2.core.provider.persistence.VolatileOperationPersistenceProvider
+import org.apache.nifi.minifi.c2.model.*
+import spock.lang.Specification
+
+import javax.validation.ConstraintViolationException
+import javax.validation.Validation
+
+class StandardC2ServiceSpec extends Specification{
+
+    AgentPersistenceProvider agentPersistenceProvider
+    AgentClassPersistenceProvider agentClassPersistenceProvider
+    AgentManifestPersistenceProvider agentManifestPersistenceProvider
+    DevicePersistenceProvider devicePersistenceProvider
+    OperationPersistenceProvider operationPersistenceProvider
+    C2Service c2Service
+
+    def setup() {
+        agentPersistenceProvider = new VolatileAgentPersistenceProvider()
+        agentClassPersistenceProvider = new VolatileAgentClassPersistenceProvider()
+        agentManifestPersistenceProvider = new VolatileAgentManifestPersistenceProvider()
+        devicePersistenceProvider = new VolatileDevicePersistenceProvider()
+        operationPersistenceProvider = new VolatileOperationPersistenceProvider()
+        def validator = Validation.buildDefaultValidatorFactory().getValidator()
+        c2Service = new StandardC2Service(
+                agentPersistenceProvider,
+                agentClassPersistenceProvider,
+                agentManifestPersistenceProvider,
+                devicePersistenceProvider,
+                operationPersistenceProvider,
+                validator)
+    }
+
+    //**********************************
+    //***  Agent Class CRUD methods  ***
+    //**********************************
+
+    def "create agent class"() {
+
+        when: "arg is null"
+        c2Service.createAgentClass(null)
+
+        then: "exception is thrown"
+        thrown IllegalArgumentException
+
+
+        when: "class name is null"
+        c2Service.createAgentClass(new AgentClass())
+
+        then: "exception is thrown"
+        thrown ConstraintViolationException
+
+        when: "valid class is created"
+        def createdClass = c2Service.createAgentClass(new AgentClass([name: "myClass", description: "myDescription"]))
+
+        then: "created class is returned"
+        with(createdClass) {
+            name == "myClass"
+            description == "myDescription"
+        }
+
+        when: "class with same name already exists"
+        c2Service.createAgentClass(new AgentClass([name: "myDupeClass", description: "myDescription1"]))
+        c2Service.createAgentClass(new AgentClass([name: "myDupeClass", description: "myDescription2"]))
+
+        then: "exception is thrown"
+        thrown IllegalStateException
+
+    }
+
+    def "get agent classes"() {
+
+        setup:
+        agentClassPersistenceProvider.save(new AgentClass([name: "class1"]))
+        agentClassPersistenceProvider.save(new AgentClass([name: "class2"]))
+        agentClassPersistenceProvider.save(new AgentClass([name: "class3"]))
+
+        when:
+        def classes = c2Service.getAgentClasses()
+
+        then:
+        classes != null
+        classes.size() == 3
+
+    }
+
+    def "get agent class"() {
+
+        when: "class does not exist"
+        def ac1 = c2Service.getAgentClass("myClass")
+
+        then: "empty optional is returned"
+        !ac1.isPresent()
+
+
+        when: "class does exist"
+        agentClassPersistenceProvider.save(new AgentClass([name: "myClass"]))
+        def ac2 = c2Service.getAgentClass("myClass")
+
+        then: "class is returned"
+        ac2.isPresent()
+        with(ac2.get()) {
+            name == "myClass"
+        }
+
+    }
+
+    def "update agent class"() {
+
+        when: "class does not exist"
+        c2Service.updateAgentClass(new AgentClass([name: "myClass", description: "new description"]))
+
+        then:
+        thrown ResourceNotFoundException
+
+
+        when: "class exists"
+        agentClassPersistenceProvider.save(new AgentClass([name: "myClass"]))
+        def updatedClass = c2Service.updateAgentClass(new AgentClass([name: "myClass", description: "new description"]))
+
+        then:
+        with(updatedClass) {
+            name == "myClass"
+            description == "new description"
+        }
+
+    }
+
+    def "delete agent class"() {
+
+        when: "class does not exist"
+        c2Service.deleteAgent("myClass")
+
+        then:
+        thrown ResourceNotFoundException
+
+
+        when: "class exists"
+        agentClassPersistenceProvider.save(new AgentClass([name: "myClass"]))
+        def deletedClass = c2Service.deleteAgentClass("myClass")
+
+        then:
+        with(deletedClass) {
+            name == "myClass"
+        }
+
+        and: "class no longer exists in persistence provider"
+        agentClassPersistenceProvider.getCount() == 0
+
+    }
+
+
+    //*************************************
+    //***  Agent Manifest CRUD methods  ***
+    //*************************************
+
+    def "create agent manifest"() {
+
+        when: "arg is null"
+        c2Service.createAgentManifest(null)
+
+        then: "exception is thrown"
+        thrown IllegalArgumentException
+
+
+        when: "valid manifest is created without client-set id"
+        def created = c2Service.createAgentManifest(new AgentManifest([agentType: "java"]))
+
+        then: "manifest is created and assigned an id"
+        with(created) {
+            identifier != null
+        }
+
+
+        when: "valid manifest is created with client-set id"
+        def created2 = c2Service.createAgentManifest(new AgentManifest([identifier: "manifest1", agentType: "java"]))
+
+        then: "manifest is created with client-set id"
+        with(created2) {
+            identifier == "manifest1"
+        }
+
+
+        when: "manifest is created with client-set id that already exists"
+        c2Service.createAgentManifest(new AgentManifest([identifier: "manifest1", agentType: "java"]))
+        c2Service.createAgentManifest(new AgentManifest([identifier: "manifest1", agentType: "cpp"]))
+
+        then: "exception is thrown"
+        thrown IllegalStateException
+
+    }
+
+    def "get agent manifests"() {
+
+        setup:
+        agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest1"]))
+        agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest2"]))
+        agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest3"]))
+
+        when:
+        def manifests = c2Service.getAgentManifests()
+
+        then:
+        manifests != null
+        manifests.size() == 3
+
+    }
+
+    def "get agent manifests by class name"() {
+
+        setup:
+        agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest1"]))
+        agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest2"]))
+        agentManifestPersistenceProvider.save(new AgentManifest([identifier: "manifest3"]))
+        agentClassPersistenceProvider.save(new AgentClass([name: "myClass", agentManifests: ["manifest2"]]))
+
+        when:
+        def manifests = c2Service.getAgentManifests("myClass")
+
+        then:
+        manifests != null
+        manifests.size() == 1
+        manifests.get(0).getIdentifier() == "manifest2"
+
+    }
+
+    def "get agent manifest"() {
+
+        when: "manifest does not exist"
+        def manifest1 = c2Service.getAgentManifest("myManifest")
+
+        then: "empty optional is returned"
+        !manifest1.isPresent()
+
+
+        when: "manifest exists"
+        agentManifestPersistenceProvider.save(new AgentManifest([identifier: "myManifest"]))
+        def manifest2 = c2Service.getAgentManifest("myManifest")
+
+        then: "manifest is returned"
+        manifest2.isPresent()
+        with(manifest2.get()) {
+            identifier == "myManifest"
+        }
+
+    }
+
+    def "delete agent manifest"() {
+
+        when: "manifest does not exist"
+        c2Service.deleteAgentManifest("myManifest")
+
+        then: "empty optional is returned"
+        thrown ResourceNotFoundException
+
+
+        when: "manifest exists"
+        agentManifestPersistenceProvider.save(new AgentManifest([identifier: "myManifest"]))
+        def deleted = c2Service.deleteAgentManifest("myManifest")
+
+        then: "manifest is returned"
+        with(deleted) {
+            identifier == "myManifest"
+        }
+
+        and: "manifest no longer exists in persistence provider"
+        agentManifestPersistenceProvider.getCount() == 0
+
+    }
+
+
+    //****************************
+    //***  Agent CRUD methods  ***
+    //****************************
+
+    def "create agent"() {
+
+        when: "arg is null"
+        c2Service.createAgent(null)
+
+        then: "exception is thrown"
+        thrown IllegalArgumentException
+
+
+        when: "valid agent is created"
+        def created = c2Service.createAgent(new Agent([identifier: "agent1"]))
+
+        then: "created agent is returned"
+        with(created) {
+            identifier == "agent1"
+        }
+
+
+        when: "agent is created with id that already exists"
+        c2Service.createAgent(new Agent([identifier: "agent1"]))
+        c2Service.createAgent(new Agent([identifier: "agent1"]))
+
+        then: "exception is thrown"
+        thrown IllegalStateException
+
+    }
+
+    def "get agents"() {
+
+        setup:
+        agentPersistenceProvider.save(new Agent([identifier: "agent1"]))
+        agentPersistenceProvider.save(new Agent([identifier: "agent2"]))
+        agentPersistenceProvider.save(new Agent([identifier: "agent3"]))
+
+        when:
+        def agents = c2Service.getAgents()
+
+        then:
+        agents.size() == 3
+
+    }
+
+    def "get agents by class name"() {
+
+        setup:
+        agentPersistenceProvider.save(new Agent([identifier: "agent1"]))
+        agentPersistenceProvider.save(new Agent([identifier: "agent2", agentClass: "myClass"]))
+        agentPersistenceProvider.save(new Agent([identifier: "agent3", agentClass: "yourClass"]))
+
+        when:
+        def agents = c2Service.getAgents("myClass")
+
+        then:
+        agents.size() == 1
+        agents.get(0).identifier == "agent2"
+
+    }
+
+    def "get agent"() {
+
+        when: "agent does not exist"
+        def agent1 = c2Service.getAgent("agent1")
+
+        then: "empty optional is returned"
+        !agent1.isPresent()
+
+
+        when: "agent exists"
+        agentPersistenceProvider.save(new Agent([identifier: "agent2"]))
+        def agent2 = c2Service.getAgent("agent2")
+
+        then: "agent is returned"
+        agent2.isPresent()
+        with(agent2.get()) {
+            identifier == "agent2"
+        }
+
+    }
+
+    def "update agent"() {
+
+        when: "agent does not exist"
+        c2Service.updateAgent(new Agent([identifier: "agent1", name: "a better agent"]))
+
+        then:
+        thrown ResourceNotFoundException
+
+
+        when: "agent exists"
+        agentPersistenceProvider.save(new Agent([identifier: "agent1"]))
+        def updated = c2Service.updateAgent(new Agent([identifier: "agent1", name: "a better agent"]))
+
+        then:
+        with(updated) {
+            identifier == "agent1"
+            name == "a better agent"
+        }
+
+    }
+
+    def "delete agent"() {
+
+        when: "agent does not exist"
+        c2Service.deleteAgent("agent1")
+
+        then:
+        thrown ResourceNotFoundException
+
+
+        when: "agent exists"
+        agentPersistenceProvider.save(new Agent([identifier: "agent1"]))
+        def deleted = c2Service.deleteAgent("agent1")
+
+        then:
+        with(deleted) {
+            identifier == "agent1"
+        }
+
+    }
+
+    //*****************************
+    //***  Device CRUD methods  ***
+    //*****************************
+
+    def "create device"() {
+
+        when: "arg is null"
+        c2Service.createDevice(null)
+
+        then: "exception is thrown"
+        thrown IllegalArgumentException
+
+
+        when: "arg is invalid"
+        c2Service.createDevice(new Device())
+
+        then: "exception is thrown"
+        thrown ConstraintViolationException
+
+
+        when: "valid device is created"
+        def created = c2Service.createDevice(new Device([identifier: "device1"]))
+
+        then: "created device is returned"
+        with(created) {
+            identifier == "device1"
+        }
+
+
+        when: "device is created with existing id"
+        c2Service.createDevice(new Device([identifier: "device1"]))
+        c2Service.createDevice(new Device([identifier: "device2"]))
+
+        then: "exception is thrown"
+        thrown IllegalStateException
+
+    }
+
+    def "get devices"() {
+
+        setup:
+        devicePersistenceProvider.save(new Device([identifier: "device1"]))
+        devicePersistenceProvider.save(new Device([identifier: "device2"]))
+        devicePersistenceProvider.save(new Device([identifier: "device3"]))
+
+        when:
+        def devices = c2Service.getDevices()
+
+        then:
+        devices.size() == 3
+
+    }
+
+    def "get device"() {
+
+        when: "device does not exist"
+        def device1 = c2Service.getDevice("device1")
+
+        then: "empty optional is returned"
+        !device1.isPresent()
+
+
+        when: "device exists"
+        devicePersistenceProvider.save(new Device([identifier: "device2"]))
+        def device2 = c2Service.getDevice("device2")
+
+        then: "device is returned"
+        device2.isPresent()
+        with(device2.get()) {
+            identifier == "device2"
+        }
+
+    }
+
+    def "update device"() {
+
+        when: "device does not exist"
+        c2Service.updateDevice(new Device([identifier: "device1", name: "MiNiFi Device"]))
+
+        then:
+        thrown ResourceNotFoundException
+
+
+        when: "agent exists"
+        devicePersistenceProvider.save(new Device([identifier: "device1"]))
+        def updated = c2Service.updateDevice(new Device([identifier: "device1", name: "MiNiFi Device"]))
+
+        then:
+        with(updated) {
+            identifier == "device1"
+            name == "MiNiFi Device"
+        }
+
+    }
+
+    def "delete device"() {
+
+        when: "device does not exist"
+        c2Service.deleteDevice("device1")
+
+        then:
+        thrown ResourceNotFoundException
+
+
+        when: "agent exists"
+        devicePersistenceProvider.save(new Device([identifier: "device1"]))
+        def deleted = c2Service.deleteDevice("device1")
+
+        then:
+        with(deleted) {
+            identifier == "device1"
+        }
+
+    }
+
+
+    //***********************************
+    //***  C2 Operation CRUD methods  ***
+    //***********************************
+
+    def "create operaton"() {
+
+        when: "arg is null"
+        c2Service.createOperation(null)
+
+        then: "exception is thrown"
+        thrown IllegalArgumentException
+
+
+        when: "arg is invalid"
+        c2Service.createOperation(new OperationRequest())
+
+        then: "exception is thrown"
+        thrown ConstraintViolationException
+
+
+        when: "valid operation is created"
+        def created = c2Service.createOperation(
+                new OperationRequest([
+                        operation: new C2Operation([operation: OperationType.DESCRIBE]),
+                        targetAgentIdentifier: "agent1",
+                        state: OperationState.NEW
+                ]))
+
+        then: "created operation is returned, generated id"
+        with(created) {
+            targetAgentIdentifier == "agent1"
+            operation.getIdentifier() != null
+        }
+
+    }
+
+    def "get operations"() {
+
+        setup:
+        operationPersistenceProvider.save(new OperationRequest([
+                operation: new C2Operation([identifier: "operation1", operation: OperationType.DESCRIBE]),
+                targetAgentIdentifier: "agent1",
+                state: OperationState.NEW
+        ]))
+        operationPersistenceProvider.save(new OperationRequest([
+                operation: new C2Operation([identifier: "operation2", operation: OperationType.RESTART]),
+                targetAgentIdentifier: "agent2",
+                state: OperationState.NEW
+        ]))
+
+        when: "get all operations"
+        def operations = c2Service.getOperations()
+
+        then:
+        operations.size() == 2
+
+        when: "get operations for agent2"
+        operations = c2Service.getOperationsByAgent("agent2")
+
+        then:
+        operations.size() == 1
+        operations.get(0).operation.identifier == "operation2"
+
+    }
+
+    def "get operation"() {
+
+        when: "operation does not exist"
+        def operation1 = c2Service.getOperation("operation1")
+
+        then: "empty optional is returned"
+        !operation1.isPresent()
+
+
+        when: "operation exists"
+        operationPersistenceProvider.save(new OperationRequest([
+                operation: new C2Operation([identifier: "operation1", operation: OperationType.DESCRIBE]),
+                targetAgentIdentifier: "agent1",
+                state: OperationState.NEW
+        ]))
+        def operation2 = c2Service.getOperation("operation1")
+
+        then: "operation is returned"
+        operation2.isPresent()
+        with(operation2.get()) {
+            operation.identifier == "operation1"
+        }
+
+    }
+
+    def "update operation state"() {
+
+        when: "operation does not exist"
+        c2Service.updateOperationState("operation1", OperationState.DONE)
+
+        then:
+        thrown ResourceNotFoundException
+
+
+        when: "operation exists"
+        operationPersistenceProvider.save(new OperationRequest([
+                operation: new C2Operation([identifier: "operation1", operation: OperationType.DESCRIBE]),
+                targetAgentIdentifier: "agent1",
+                state: OperationState.NEW
+        ]))
+        def updated = c2Service.updateOperationState("operation1", OperationState.DONE)
+
+        then:
+        with(updated) {
+            state == OperationState.DONE
+        }
+
+    }
+
+    def "delete operation"() {
+
+        when: "operation does not exist"
+        c2Service.deleteOperation("operation1")
+
+        then:
+        thrown ResourceNotFoundException
+
+
+        when: "operation exists"
+        operationPersistenceProvider.save(new OperationRequest([
+                operation: new C2Operation([identifier: "operation1", operation: OperationType.DESCRIBE]),
+                targetAgentIdentifier: "agent1",
+                state: OperationState.NEW
+        ]))
+        def deleted = c2Service.deleteOperation("operation1")
+
+        then:
+        with(deleted) {
+            operation.identifier == "operation1"
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-jetty/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-jetty/pom.xml b/minifi-c2/minifi-c2-jetty/pom.xml
index 241b3bb..e76b296 100644
--- a/minifi-c2/minifi-c2-jetty/pom.xml
+++ b/minifi-c2/minifi-c2-jetty/pom.xml
@@ -32,6 +32,7 @@ limitations under the License.
             <groupId>org.apache.nifi.minifi</groupId>
             <artifactId>minifi-c2-commons</artifactId>
             <version>${project.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-provider-api/pom.xml
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-provider-api/pom.xml b/minifi-c2/minifi-c2-provider-api/pom.xml
new file mode 100644
index 0000000..8755a93
--- /dev/null
+++ b/minifi-c2/minifi-c2-provider-api/pom.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>minifi-c2</artifactId>
+        <groupId>org.apache.nifi.minifi</groupId>
+        <version>0.5.0-SNAPSHOT</version>
+    </parent>
+    <artifactId>minifi-c2-provider-api</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+    </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/Provider.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/Provider.java b/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/Provider.java
new file mode 100644
index 0000000..f2c5bbb
--- /dev/null
+++ b/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/Provider.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.api.provider;
+
+/**
+ * Base interface for providers.
+ */
+public interface Provider {
+
+    /**
+     * Called prior to configuring the Provider in order to discover the prefix for the provider's properties.
+     *
+     * It is recommended that the prefix match the base package for the provider implementation
+     * classes, in order to ensure uniqueness. For example, if your provider uses the package
+     * com.mycompany.nifi-providers.example-provider, that package name can server as the
+     *
+     * If your properties prefix string collides with another configured provider at runtime,
+     * the server will throw a runtime exception and fail to start.
+     */
+    // String getPropertiesPrefix();
+    // ^^^ TODO, this is disabled as it was an experimental look at how to simplify extension provider configuration.
+    // The idea is to combine their config into minifi-c2.properties and allow them to get their config my defining their prefix.
+    // However I'm not sure it can work in all cases as it's not as powerful/flexible as JAXB.
+    // For simple, standalone providers, such as persistence provider, it's probably sufficient,
+    // but for more complex extensions, such as an Authorizers framework, it may be too limiting.
+
+    /**
+     * Called to configure the Provider.
+     *
+     * @param configurationContext the context containing configuration for the given provider
+     * @throws ProviderCreationException if an error occurs while the provider is configured
+     */
+    void onConfigured(ProviderConfigurationContext configurationContext) throws ProviderCreationException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/11bb8dbe/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/ProviderConfigurationContext.java
----------------------------------------------------------------------
diff --git a/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/ProviderConfigurationContext.java b/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/ProviderConfigurationContext.java
new file mode 100644
index 0000000..59f1364
--- /dev/null
+++ b/minifi-c2/minifi-c2-provider-api/src/main/java/org/apache/nifi/minifi/c2/api/provider/ProviderConfigurationContext.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.api.provider;
+
+import java.util.Map;
+
+/**
+ * A context that will passed to providers in order to obtain configuration.
+ */
+public interface ProviderConfigurationContext {
+
+    /**
+     * Retrieves all properties prefixed by provider's property prefix.
+     *
+     * @return Map of all properties
+     */
+    Map<String, String> getProperties();
+
+}