You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/23 16:02:35 UTC

[GitHub] [nifi] simonbence opened a new pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

simonbence opened a new pull request #4839:
URL: https://github.com/apache/nifi/pull/4839


   _Note: this review is a successor of [PR 4821](https://github.com/apache/nifi/pull/4821). In order to keep the changes readable I decided to abandon that one and start a fresh pull request._
   
   This would be a proposal for having persistent status history for both component and node level metrics. I decided to approach the question using `QuestDB` which is an embeddable time series database written in Java. During my tests, the performance and the memory footprint are looked reasonable.
   
   As for code organisation of the new implementation I worked with three concepts: repository is the top level entity, provides the service for the clients (`FlowController`, etc.). In general, this is the same as before, with minor changes (the name has been changed to `StatusHistoryRepostiory` to server the intent better, also start and stop hooks were added). The storage classes are part of the repositories and are responsible for manage the details of a given type, like processor status, node status, etc. Finally the `WriterTemplates` and `ReaderTemplates` are merely helpers exist to deal with `QuestDB` API calls.
   
   Some remarks on design decisions:
   
   - The PR contains a small fix on ProcessGroupStatusDescriptor#calculateTaskMillis(): previously every subsequent level makes a nano->milli conversion, which accumulates, reducing the task time in children groups into 0. Now this should be fixed (The QuestDB tests are implicitly proves this as well)
   - Configuration contains the new parameters but they are commented out. These are relevant for the QuestDB implementation and if not used the implementation will go with default values. At this point I think, the VolatileComponentStatusRepository should be kept as default implementation
   - The implementation depends on the latest of the 4.X version of QuestDB. Currently QuestDB is at 5.0.5, but the 5.X line depends on Java 11. There are some non vital features in 5.X (all_tables request, dropping partition using WHERE closure), but these are not unavoidable for us.
   - QuestDB can be called using `PostrgeSQL` driver via `JDBC` if it is not embedded. Also it provides supports `InfluxDB` protocol. These are possibilities were not utilised: QuestDB's own API seems to be sufficient and robust for our purposes.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4839:
URL: https://github.com/apache/nifi/pull/4839#discussion_r582046000



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/questdb/QuestDbComponentStatusStorage.java
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage.questdb;
+
+import io.questdb.cairo.sql.Record;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.commons.lang3.time.FastDateFormat;

Review comment:
       This Date API from Commons Lang has not been used in NiFi so far.
   I think we should try to use the `java.time` API in new codes if possible.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
##########
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.MessageBusImpl;
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.math3.util.Pair;

Review comment:
       I would suggest using `ImmutablePair` from Commons Lang instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4839:
URL: https://github.com/apache/nifi/pull/4839#discussion_r581216003



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/QuestDbComponentStatusStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import io.questdb.cairo.sql.Record;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.commons.math3.util.Pair;
+import org.apache.nifi.controller.status.history.ComponentDetailsStorage;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusHistory;
+import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.apache.nifi.controller.status.history.questdb.QuestDbEntityReadingTemplate;
+import org.apache.nifi.controller.status.history.questdb.QuestDbEntityWritingTemplate;
+import org.apache.nifi.controller.status.history.questdb.QuestDbStatusSnapshotMapper;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Component specific implementation of the {@link ComponentStatusStorage}.
+ *
+ * @param <T> Component status entry type.
+ */
+abstract class QuestDbComponentStatusStorage<T> implements ComponentStatusStorage<T> {

Review comment:
       It might be worth separating these `QuestDb***StatusStorage` implementation classes to their own subpackage.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/StorageStatusReadingTemplate.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
+import org.apache.nifi.controller.status.history.questdb.QuestDbReadingTemplate;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class StorageStatusReadingTemplate extends QuestDbReadingTemplate<Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>>> {
+    private static final String STORAGE_FREE_DESCRIPTION = "The usable space available for use by the underlying storage mechanism.";
+    private static final String STORAGE_USED_DESCRIPTION = "The space in use on the underlying storage mechanism";
+
+    private static final String STORAGE_READING_QUERY =
+            "SELECT * FROM storageStatus " +
+                    "WHERE capturedAt > to_timestamp('%s', '" + StatusStorage.CAPTURE_DATE_FORMAT + "') " +
+                    "AND capturedAt < to_timestamp('%s', '" + StatusStorage.CAPTURE_DATE_FORMAT + "') " +
+                    "ORDER BY capturedAt ASC";
+
+    public StorageStatusReadingTemplate() {
+        super(STORAGE_READING_QUERY, e -> Collections.emptyMap());
+    }
+
+    @Override
+    protected Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>> processResult(final RecordCursor cursor) {
+        final Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>> result = new HashMap<>();
+
+        int storageNumber = 1;
+
+        while (cursor.hasNext()) {
+            final Record record = cursor.getRecord();
+            final long createdAt = TimeUnit.MICROSECONDS.toMillis(record.getTimestamp(0));
+            final short type = record.getShort(2);
+            final CharSequence name = record.getSym(1);
+
+            if (!result.containsKey(createdAt)) {
+                result.put(createdAt, new HashMap<>());
+            }
+
+            result.get(createdAt).put(getDescriptor(
+                    QuestDbNodeStatusStorage.getMetrics().size() + result.get(createdAt).size(),
+                    getField(type, storageNumber, StorageMetric.FREE),
+                    getLabel(type, name, StorageMetric.FREE),
+                    STORAGE_FREE_DESCRIPTION
+            ), record.getLong(3));
+            result.get(createdAt).put(getDescriptor(
+                    QuestDbNodeStatusStorage.getMetrics().size() + result.get(createdAt).size(),
+                    getField(type, storageNumber, StorageMetric.USED),
+                    getLabel(type, name, StorageMetric.USED),
+                    STORAGE_USED_DESCRIPTION
+            ), record.getLong(4));
+            storageNumber++;
+        }
+
+        return result;
+    }
+
+    private StandardMetricDescriptor<NodeStatus> getDescriptor(final int ordinal, final String field, final String label, final String description) {
+        return new StandardMetricDescriptor<>(() -> ordinal, field, label, STORAGE_FREE_DESCRIPTION, MetricDescriptor.Formatter.DATA_SIZE, v -> 0L
+        );

Review comment:
       Minor formatting: `);` should be in the same line.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.MessageBusImpl;
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.math3.util.Pair;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.apache.nifi.controller.status.history.questdb.QuestDbDatabaseManager;
+import org.apache.nifi.controller.status.history.storage.BufferedWriterFlushWorker;
+import org.apache.nifi.controller.status.history.storage.BufferedWriterForStatusStorage;
+import org.apache.nifi.controller.status.history.storage.ComponentStatusStorage;
+import org.apache.nifi.controller.status.history.storage.GarbageCollectionStatusStorage;
+import org.apache.nifi.controller.status.history.storage.NodeStatusStorage;
+import org.apache.nifi.controller.status.history.storage.ProcessorStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbConnectionStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbGarbageCollectionStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbNodeStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbProcessGroupStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbProcessorStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbRemoteProcessGroupStatusStorage;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class EmbeddedQuestDbStatusHistoryRepository implements StatusHistoryRepository {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbStatusHistoryRepository.class);
+    private static final int PERSIST_BATCH_SIZE = 1000;
+    private static final long PERSIST_FREQUENCY = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
+    private static final long ROLL_FREQUENCY = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
+
+    private final InMemoryComponentDetailsStorage componentDetailsProvider = new InMemoryComponentDetailsStorage();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(3, new BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbStatusHistoryRepositoryWorker-%d").build());
+
+    private final QuestDbContext dbContext;
+    private final long persistFrequency;
+    private final int daysToKeepNodeData;
+    private final int daysToKeepComponentData;
+
+    private final ProcessorStatusStorage processorStatusStorage;
+    private final ComponentStatusStorage<ConnectionStatus> connectionStatusStorage;
+    private final ComponentStatusStorage<ProcessGroupStatus> processGroupStatusStorage;
+    private final ComponentStatusStorage<RemoteProcessGroupStatus> remoteProcessGroupStatusStorage;
+    private final NodeStatusStorage nodeStatusStorage;
+    private final GarbageCollectionStatusStorage garbageCollectionStatusStorage;
+
+    private final BufferedWriterForStatusStorage<ProcessorStatus> processorStatusWriter;
+    private final BufferedWriterForStatusStorage<ConnectionStatus> connectionStatusWriter;
+    private final BufferedWriterForStatusStorage<ProcessGroupStatus> processGroupStatusWriter;
+    private final BufferedWriterForStatusStorage<RemoteProcessGroupStatus> remoteProcessGroupStatusWriter;
+    private final BufferedWriterForStatusStorage<NodeStatus> nodeStatusWriter;
+    private final BufferedWriterForStatusStorage<GarbageCollectionStatus> garbageCollectionStatusWriter;
+
+    /**
+     * Default no args constructor for service loading only
+     */
+    public EmbeddedQuestDbStatusHistoryRepository() {
+        dbContext = null;
+        persistFrequency = PERSIST_FREQUENCY;
+        daysToKeepNodeData = -1;
+        daysToKeepComponentData = -1;
+
+        processorStatusStorage = null;
+        connectionStatusStorage = null;
+        processGroupStatusStorage = null;
+        remoteProcessGroupStatusStorage = null;
+        nodeStatusStorage = null;
+        garbageCollectionStatusStorage = null;
+
+        processorStatusWriter = null;
+        connectionStatusWriter = null;
+        processGroupStatusWriter = null;
+        remoteProcessGroupStatusWriter = null;
+        nodeStatusWriter = null;
+        garbageCollectionStatusWriter = null;
+    }
+
+    public EmbeddedQuestDbStatusHistoryRepository(final NiFiProperties niFiProperties) {
+        this(niFiProperties, PERSIST_FREQUENCY);
+    }
+
+    EmbeddedQuestDbStatusHistoryRepository(final NiFiProperties niFiProperties, final long persistFrequency) {
+        final Path persistLocation = niFiProperties.getQuestDbStatusRepositoryPath();
+        final CairoConfiguration configuration = new DefaultCairoConfiguration(persistLocation.toString());
+        QuestDbDatabaseManager.checkDatabaseStatus(persistLocation);
+
+        this.persistFrequency = persistFrequency;
+        daysToKeepNodeData = getDaysToKeepNodeData(niFiProperties);
+        daysToKeepComponentData = getDaysToKeepComponentData(niFiProperties);
+        dbContext = new QuestDbContext(new CairoEngine(configuration), new MessageBusImpl());
+
+        nodeStatusStorage = new QuestDbNodeStatusStorage(dbContext);
+        garbageCollectionStatusStorage = new QuestDbGarbageCollectionStatusStorage(dbContext);
+        processorStatusStorage = new QuestDbProcessorStatusStorage(dbContext, componentDetailsProvider);
+        connectionStatusStorage = new QuestDbConnectionStatusStorage(dbContext, componentDetailsProvider);
+        processGroupStatusStorage = new QuestDbProcessGroupStatusStorage(dbContext, componentDetailsProvider);
+        remoteProcessGroupStatusStorage = new QuestDbRemoteProcessGroupStatusStorage(dbContext, componentDetailsProvider);
+
+        nodeStatusWriter = new BufferedWriterForStatusStorage<>(nodeStatusStorage, PERSIST_BATCH_SIZE);
+        garbageCollectionStatusWriter = new BufferedWriterForStatusStorage<>(garbageCollectionStatusStorage, PERSIST_BATCH_SIZE);
+        processorStatusWriter = new BufferedWriterForStatusStorage<>(processorStatusStorage, PERSIST_BATCH_SIZE);
+        connectionStatusWriter = new BufferedWriterForStatusStorage<>(connectionStatusStorage, PERSIST_BATCH_SIZE);
+        processGroupStatusWriter = new BufferedWriterForStatusStorage<>(processGroupStatusStorage, PERSIST_BATCH_SIZE);
+        remoteProcessGroupStatusWriter = new BufferedWriterForStatusStorage<>(remoteProcessGroupStatusStorage, PERSIST_BATCH_SIZE);
+    }
+
+    @Override
+    public void start() {
+        LOGGER.debug("Starting status history repository");
+
+        final EmbeddedQuestDbRolloverHandler nodeRolloverHandler = new EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getNodeTableNames(), daysToKeepNodeData, dbContext);
+        final EmbeddedQuestDbRolloverHandler componentRolloverHandler = new EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getComponentTableNames(), daysToKeepComponentData, dbContext);
+        final BufferedWriterFlushWorker writer = new BufferedWriterFlushWorker(Arrays.asList(
+            nodeStatusWriter,
+            garbageCollectionStatusWriter,
+            processorStatusWriter,
+            connectionStatusWriter,
+            processGroupStatusWriter,
+            remoteProcessGroupStatusWriter
+        ));
+
+        scheduledExecutorService.scheduleWithFixedDelay(nodeRolloverHandler, 0, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(componentRolloverHandler, 0, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(writer, 0, persistFrequency, TimeUnit.MILLISECONDS);
+
+        LOGGER.debug("Status history repository is started");
+    }
+
+    @Override
+    public void shutdown() {
+        LOGGER.debug("Component status repository is shutting down");

Review comment:
       Minor: not component specific.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {

Review comment:
       Not sure but it is rather "purge" than "rollover" what this class is doing.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BufferedWriterFlushWorker implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(BufferedWriterFlushWorker.class);
+
+    private final List<BufferedWriter<?>> bufferedWriterList = new ArrayList<>();
+
+    public BufferedWriterFlushWorker(final List<BufferedWriter<?>> bufferedWriterList) {
+        this.bufferedWriterList.addAll(bufferedWriterList);
+    }
+
+    @Override
+    public void run() {
+        try {
+            bufferedWriterList.forEach(statusRepository -> statusRepository.flush());

Review comment:
       Minor: it is not a `statusRepository` specific function.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,18 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status History Repository
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Volatile Status History Repository Properties
 nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
 nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
+# QuestDB Status History Repository Properties
+# nifi.status.repository.questdb.persist.node.days=${nifi.status.repository.questdb.persist.node.days}
+# nifi.status.repository.questdb.persist.component.days=${nifi.status.repository.questdb.persist.component.days}
+# nifi.status.repository.questdb.persist.location=${nifi.status.repository.questdb.persist.location}

Review comment:
       I would consider to add these properties uncommented. In that way it would be clear which values are in effect. 

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import io.questdb.MessageBusImpl;
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import io.questdb.griffin.SqlExecutionContextImpl;
+import org.apache.nifi.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The database manager is responsible for checking and maintaining the health of the database during startup.
+ */
+public final class QuestDbDatabaseManager {
+    private enum DatabaseStatus {
+        HEALTHY, NON_EXISTING, CORRUPTED;
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(QuestDbQueries.class);

Review comment:
       Wrong class for logger.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/ComponentCounterWritingTemplate.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import io.questdb.cairo.TableWriter;
+import org.apache.commons.math3.util.Pair;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.history.questdb.QuestDbWritingTemplate;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+class ComponentCounterWritingTemplate extends QuestDbWritingTemplate<Pair<Date, ProcessorStatus>> {
+
+    public ComponentCounterWritingTemplate() {
+        super("componentCounter");
+    }
+
+    @Override
+    protected void addRows(final TableWriter tableWriter, final Collection<Pair<Date, ProcessorStatus>> entries) {
+        for (final Pair<Date, ProcessorStatus> entry : entries) {
+            final Map<String, Long> counters = entry.getSecond().getCounters();
+
+            if (counters != null && counters.size() > 0) {
+                for (final Map.Entry<String, Long> counter : counters.entrySet()) {
+                    final long measuredAt = TimeUnit.MILLISECONDS.toMicros(entry.getFirst().getTime());
+                    final TableWriter.Row counterRow = tableWriter.newRow(measuredAt);
+                    counterRow.putSym(1,  entry.getSecond().getId());
+                    counterRow.putSym(2,  counter.getKey());
+                    counterRow.putLong(3,  counter.getValue());

Review comment:
       Minor formatting: double spaces.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #4839:
URL: https://github.com/apache/nifi/pull/4839#discussion_r582795444



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {

Review comment:
       I see. Thanks for the explanation. That's fine with me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

Posted by GitBox <gi...@apache.org>.
markap14 commented on pull request #4839:
URL: https://github.com/apache/nifi/pull/4839#issuecomment-786235902


   Thanks @simonbence ! All looks good to me. I made a couple of minor updates to the admin guide to help clarify a few properties but didn't think it was worth commenting on the PR and waiting for updates before merging. So went ahead and just made the adjustments before pushing. +1 merged to main. Many thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] tpalfy commented on a change in pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

Posted by GitBox <gi...@apache.org>.
tpalfy commented on a change in pull request #4839:
URL: https://github.com/apache/nifi/pull/4839#discussion_r582151895



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriter.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+/**
+ * Wraps a writer object in order to buffer incoming store requests and dispatch the incoming store requests in batches.
+ */
+public interface BufferedWriter<T> {

Review comment:
       Considering `java.io` already has a `BufferedWriter` class, could we rename this to `BufferedEntityWriter`?

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ComponentDetailsStorage.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import java.util.Map;
+
+/**
+ * Stores and returns with the details of a given component. Implementations are expected to be thread safe.

Review comment:
       ```suggestion
    * Stores and returns the details of a given component. Implementations are expected to be thread safe.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4839:
URL: https://github.com/apache/nifi/pull/4839#discussion_r582774123



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriter.java
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+/**
+ * Wraps a writer object in order to buffer incoming store requests and dispatch the incoming store requests in batches.
+ */
+public interface BufferedWriter<T> {

Review comment:
       Valid point. As both te implementation mentions entry, I will go with BufferedEntryWriter




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] simonbence commented on a change in pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

Posted by GitBox <gi...@apache.org>.
simonbence commented on a change in pull request #4839:
URL: https://github.com/apache/nifi/pull/4839#discussion_r581741855



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {

Review comment:
       I wanted to go with a more general term. The key point of this feature is to prevent the data set from ever growing. One way is the purge the data, but in the long run I would like to extend this with some aggregating features: older data chunks will be merged to represent bigger amount of time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 closed pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

Posted by GitBox <gi...@apache.org>.
markap14 closed pull request #4839:
URL: https://github.com/apache/nifi/pull/4839


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org