You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/16 21:39:38 UTC

[GitHub] [nifi] turcsanyip commented on a change in pull request #4821: NIFI-8113 Adding persistent status history repository backed by embed…

turcsanyip commented on a change in pull request #4821:
URL: https://github.com/apache/nifi/pull/4821#discussion_r577129664



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
##########
@@ -206,6 +206,10 @@ The following binary components are provided under the Apache Software License v
         Bytes Utility Library 1.3.0
         Copyright 2017 Patrick Favre-Bulle
 
+  (ASLv2) QuestDB (questdb-5.0.5.jar - https://github.com/questdb/questdb)

Review comment:
       Version does not match the one in the POM. Also in the other NOTICE file.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbWritingTemplate.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.TableWriter;
+import io.questdb.griffin.SqlExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+
+/**
+ * Template for writing entries into QuestDb.
+ *
+ * @param <T> The type of the entry.
+ */
+public abstract class QuestDbWritingTemplate<T> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(QuestDbWritingTemplate.class);
+
+    private final String tableName;
+
+    /**
+     * @param tableName Name of the target table.
+     */
+    protected QuestDbWritingTemplate(final String tableName) {
+        this.tableName = tableName;
+    }
+
+    /**
+     * Inserts the entries into the database.
+     *
+     * @param engine QuestDB engine.
+     * @param context Execution context.
+     * @param entries Entries to insert.
+     */
+    public void insert(final CairoEngine engine, final SqlExecutionContext context, final Collection<T> entries) {
+        if (entries.isEmpty()) {
+            return;
+        }
+
+        try (
+            final TableWriter tableWriter = engine.getWriter(context.getCairoSecurityContext(), tableName);
+        ) {
+            addRows(tableWriter, entries);
+            tableWriter.commit();
+        } catch (final Exception e) {
+            LOGGER.error("Error happened during writing into table " + tableName, e);
+            e.printStackTrace();

Review comment:
       Please remove `printStackTrace()`.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
##########
@@ -16,412 +16,86 @@
  */
 package org.apache.nifi.controller.status.history;
 
-import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.NodeStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.util.ComponentMetrics;
 import org.apache.nifi.util.NiFiProperties;
-import org.apache.nifi.util.RingBuffer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
-public class VolatileComponentStatusRepository implements ComponentStatusRepository {
-    private static final Logger logger = LoggerFactory.getLogger(VolatileComponentStatusRepository.class);
-
-    private static final Set<MetricDescriptor<?>> DEFAULT_PROCESSOR_METRICS = Arrays.stream(ProcessorStatusDescriptor.values())
-        .map(ProcessorStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_CONNECTION_METRICS = Arrays.stream(ConnectionStatusDescriptor.values())
-        .map(ConnectionStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_GROUP_METRICS = Arrays.stream(ProcessGroupStatusDescriptor.values())
-        .map(ProcessGroupStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<?>> DEFAULT_RPG_METRICS = Arrays.stream(RemoteProcessGroupStatusDescriptor.values())
-        .map(RemoteProcessGroupStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-    private static final Set<MetricDescriptor<NodeStatus>> DEFAULT_NODE_METRICS = Arrays.stream(NodeStatusDescriptor.values())
-        .map(NodeStatusDescriptor::getDescriptor)
-        .collect(Collectors.toSet());
-
-    private static final String STORAGE_FREE_DESCRIPTION = "The usable space available for use by the underlying storage mechanism.";
-    private static final String STORAGE_USED_DESCRIPTION = "The space in use on the underlying storage mechanism";
-
-    private static final String GC_TIME_DESCRIPTION = "The sum time the garbage collection has run since the start of the Java virtual machine.";
-    private static final String GC_TIME_DIFF_DESCRIPTION = "The sum time the garbage collection has run since the last measurement.";
-    private static final String GC_COUNT_DESCRIPTION = "The sum amount of occasions the garbage collection has run since the start of the Java virtual machine.";
-    private static final String GC_COUNT_DIFF_DESCRIPTION = "The sum amount of occasions the garbage collection has run since the last measurement.";
-
-    public static final String NUM_DATA_POINTS_PROPERTY = "nifi.components.status.repository.buffer.size";
-    public static final int DEFAULT_NUM_DATA_POINTS = 288;   // 1 day worth of 5-minute snapshots
-
-    private final Map<String, ComponentStatusHistory> componentStatusHistories = new HashMap<>();
+/**
+ * Consists of in memory repositories for node and component status history.
+ * Necessary in order to support existing way of NiFi configuration.
+ */
+@Deprecated
+public class VolatileComponentStatusRepository implements StatusRepository {
 
-    // Changed to protected to allow unit testing
-    protected final RingBuffer<Date> timestamps;
-    private final RingBuffer<List<GarbageCollectionStatus>> gcStatuses;
-    private final RingBuffer<NodeStatus> nodeStatuses;
-    private final int numDataPoints;
-    private volatile long lastCaptureTime = 0L;
+    private final NodeStatusRepository nodeStatusRepository;
+    private final ComponentStatusRepository componentStatusRepository;

Review comment:
       As `VolatileComponentStatusRepository` provides backward compatibility for the in memory storage and in the background it just delegates requests to the new `InMemory*StatusRepositories`, using those classes instead of the interface types would fit to the role of this class better.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,40 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status Repository
+
+# Implementation based specification. Must be the classname of org.apache.nifi.controller.status.history.StatusRepository implementation.
+# This property specifies both the Component and the Node Status Repositories.
+# This approach takes precedence over the builder based specification if presents.
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Builder based specification. Gives the possibility to store Node and Component Status History information in different storage solutions.
+# nifi.status.repository.builder.inmemory=org.apache.nifi.controller.status.history.InMemoryStatusRepositoryBuilder
+# nifi.status.repository.builder.persistent=org.apache.nifi.controller.status.history.EmbeddedQuestDbStatusRepositoryBuilder
+# nifi.status.repository.roles.component=persistent
+# nifi.status.repository.roles.node=inmemory

Review comment:
       I believe the new InMemory implementation should be the default instead of adding these lines commented out.
   As far as I understand, the backward compatibility point here is to support old `nifi.properties` files where only `nifi.components.status.repository.implementation` property exists. And it is provided / works even if the default is not the old property.
   New installations could (should) go with the new configuration / implementation classes.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentStatusRepositoryFacade.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Facade implementation of the StatusRepository which dispatches request to the underlying node and component status
+ * repositories. The contained repositories might use different storage implementation.
+ */
+public class ComponentStatusRepositoryFacade implements StatusRepository {

Review comment:
       This class should be called `StatusRepositoryFacade` instead because it is not component status repository specific.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org