You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/03/03 21:08:55 UTC

[GitHub] [nifi] kevdoran commented on a change in pull request #5755: NIFI-9538: Add C2 heartbeat capability to minifi-c2-service

kevdoran commented on a change in pull request #5755:
URL: https://github.com/apache/nifi/pull/5755#discussion_r819058724



##########
File path: minifi/minifi-c2/minifi-c2-assembly/pom.xml
##########
@@ -26,7 +26,7 @@ limitations under the License.
     <packaging>pom</packaging>
     <description>This is the assembly of Apache MiNiFi's - Command And Control Server</description>
     <properties>
-        <minifi.c2.server.port>10080</minifi.c2.server.port>
+        <minifi.c2.server.port>10090</minifi.c2.server.port>

Review comment:
       The corresponding [Dockerhub](https://github.com/apache/nifi/blob/7a16619969a90655b5aed6bf938fa0a3019ab673/minifi/minifi-c2/minifi-c2-docker/dockerhub/Dockerfile#L50) and [Dockermaven](https://github.com/apache/nifi/blob/7a16619969a90655b5aed6bf938fa0a3019ab673/minifi/minifi-c2/minifi-c2-docker/dockermaven/Dockerfile#L41) Dockerfiles also need to be updated.

##########
File path: minifi/minifi-c2/minifi-c2-service/src/main/java/org/apache/nifi/minifi/c2/service/SimpleC2ProtocolService.java
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.c2.service;
+
+import org.apache.nifi.c2.protocol.api.C2Heartbeat;
+import org.apache.nifi.c2.protocol.api.C2HeartbeatResponse;
+import org.apache.nifi.c2.protocol.api.C2Operation;
+import org.apache.nifi.c2.protocol.api.C2OperationAck;
+import org.apache.nifi.c2.protocol.api.C2OperationState;
+import org.apache.nifi.c2.protocol.api.OperandType;
+import org.apache.nifi.c2.protocol.api.OperationState;
+import org.apache.nifi.c2.protocol.api.OperationType;
+import org.apache.nifi.minifi.c2.util.LogMarkerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Marker;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+@Service
+public class SimpleC2ProtocolService implements C2ProtocolService {
+
+    private static final Logger logger = LoggerFactory.getLogger(SimpleC2ProtocolService.class);
+
+    private static final Set<String> issuedOperationIds = new HashSet<>();
+
+    public SimpleC2ProtocolService() {
+    }
+
+    @Override
+    public void processOperationAck(final C2OperationAck operationAck, final C2ProtocolContext context) {
+        // This service assumes there is a single Operation UPDATE to pass over the updated flow
+        Marker marker = LogMarkerUtil.getMarker(operationAck);
+
+        logger.debug(marker, "Received operation acknowledgement: {}; {}", operationAck, context);
+        // Remove the operator ID from the list of issued operations and log the state
+        final String operationId = operationAck.getOperationId();
+        try {
+            OperationState opState = OperationState.DONE;
+            String details = null;
+
+            /* Partial applications are rare and only happen when an operation consists of updating multiple config
+             * items and some succeed ( we don't yet have the concept of rollback in agents ).
+             * Fully Applied yields an operation success.
+             * Operation Not Understood and Not Applied give little details but also will result in Operation Failure.
+             * We should explore if providing textual details. */
+            final C2OperationState c2OperationState = operationAck.getOperationState();
+            if (null != c2OperationState) {
+                details = c2OperationState.getDetails();
+                if (c2OperationState.getState() != C2OperationState.OperationState.FULLY_APPLIED) {
+                    opState = OperationState.FAILED;
+                }
+            }
+
+            if (!issuedOperationIds.remove(operationId)) {
+                logger.warn(marker, "Operation with ID " + operationId + " has either already been acknowledged or is unknown to this server");
+            } else if (null != c2OperationState) {
+                final C2OperationState.OperationState operationState = c2OperationState.getState();
+                logger.debug("Operation with ID " + operationId + " acknowledged with a state of " + operationState.name() + "(" + opState.name() + "), details = "
+                        + (details == null ? "" : details));
+            }
+
+            // Optionally, an acknowledgement can include some of the info normally passed in a heartbeat.
+            // If this info is present, process it as a heartbeat, so we update our latest known state of the agent.
+            if (operationAck.getAgentInfo() != null
+                    || operationAck.getDeviceInfo() != null
+                    || operationAck.getFlowInfo() != null) {
+                final C2Heartbeat heartbeatInfo = toHeartbeat(operationAck);
+                logger.trace(marker, "Operation acknowledgement contains additional info. Processing as heartbeat: {}", heartbeatInfo);
+                processHeartbeat(heartbeatInfo, context);
+            }
+
+        } catch (final Exception e) {
+            logger.warn("Encountered exception while processing operation ack", e);
+        }
+    }
+
+    @Override
+    public C2HeartbeatResponse processHeartbeat(final C2Heartbeat heartbeat, final C2ProtocolContext context) {

Review comment:
       I agree with Csaba, this feels wrong to me. At the very least, it would be good to check the incoming heartbeat flow info to do some sort of comparison to determine if the update flow operation should be sent. Otherwise, the agent will need to add logic to determine "is this change necessary?"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org