You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/02/23 19:01:46 UTC

[GitHub] [nifi] turcsanyip commented on a change in pull request #4839: NIFI-8113 Adding persistent status history repository backed by embedded QuestDB

turcsanyip commented on a change in pull request #4839:
URL: https://github.com/apache/nifi/pull/4839#discussion_r581216003



##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/QuestDbComponentStatusStorage.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import io.questdb.cairo.sql.Record;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.commons.math3.util.Pair;
+import org.apache.nifi.controller.status.history.ComponentDetailsStorage;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardStatusHistory;
+import org.apache.nifi.controller.status.history.StatusHistory;
+import org.apache.nifi.controller.status.history.StatusSnapshot;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.apache.nifi.controller.status.history.questdb.QuestDbEntityReadingTemplate;
+import org.apache.nifi.controller.status.history.questdb.QuestDbEntityWritingTemplate;
+import org.apache.nifi.controller.status.history.questdb.QuestDbStatusSnapshotMapper;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+/**
+ * Component specific implementation of the {@link ComponentStatusStorage}.
+ *
+ * @param <T> Component status entry type.
+ */
+abstract class QuestDbComponentStatusStorage<T> implements ComponentStatusStorage<T> {

Review comment:
       It might be worth separating these `QuestDb***StatusStorage` implementation classes to their own subpackage.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/StorageStatusReadingTemplate.java
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor;
+import org.apache.nifi.controller.status.history.StandardMetricDescriptor;
+import org.apache.nifi.controller.status.history.questdb.QuestDbReadingTemplate;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class StorageStatusReadingTemplate extends QuestDbReadingTemplate<Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>>> {
+    private static final String STORAGE_FREE_DESCRIPTION = "The usable space available for use by the underlying storage mechanism.";
+    private static final String STORAGE_USED_DESCRIPTION = "The space in use on the underlying storage mechanism";
+
+    private static final String STORAGE_READING_QUERY =
+            "SELECT * FROM storageStatus " +
+                    "WHERE capturedAt > to_timestamp('%s', '" + StatusStorage.CAPTURE_DATE_FORMAT + "') " +
+                    "AND capturedAt < to_timestamp('%s', '" + StatusStorage.CAPTURE_DATE_FORMAT + "') " +
+                    "ORDER BY capturedAt ASC";
+
+    public StorageStatusReadingTemplate() {
+        super(STORAGE_READING_QUERY, e -> Collections.emptyMap());
+    }
+
+    @Override
+    protected Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>> processResult(final RecordCursor cursor) {
+        final Map<Long, Map<StandardMetricDescriptor<NodeStatus>, Long>> result = new HashMap<>();
+
+        int storageNumber = 1;
+
+        while (cursor.hasNext()) {
+            final Record record = cursor.getRecord();
+            final long createdAt = TimeUnit.MICROSECONDS.toMillis(record.getTimestamp(0));
+            final short type = record.getShort(2);
+            final CharSequence name = record.getSym(1);
+
+            if (!result.containsKey(createdAt)) {
+                result.put(createdAt, new HashMap<>());
+            }
+
+            result.get(createdAt).put(getDescriptor(
+                    QuestDbNodeStatusStorage.getMetrics().size() + result.get(createdAt).size(),
+                    getField(type, storageNumber, StorageMetric.FREE),
+                    getLabel(type, name, StorageMetric.FREE),
+                    STORAGE_FREE_DESCRIPTION
+            ), record.getLong(3));
+            result.get(createdAt).put(getDescriptor(
+                    QuestDbNodeStatusStorage.getMetrics().size() + result.get(createdAt).size(),
+                    getField(type, storageNumber, StorageMetric.USED),
+                    getLabel(type, name, StorageMetric.USED),
+                    STORAGE_USED_DESCRIPTION
+            ), record.getLong(4));
+            storageNumber++;
+        }
+
+        return result;
+    }
+
+    private StandardMetricDescriptor<NodeStatus> getDescriptor(final int ordinal, final String field, final String label, final String description) {
+        return new StandardMetricDescriptor<>(() -> ordinal, field, label, STORAGE_FREE_DESCRIPTION, MetricDescriptor.Formatter.DATA_SIZE, v -> 0L
+        );

Review comment:
       Minor formatting: `);` should be in the same line.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbStatusHistoryRepository.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.MessageBusImpl;
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.math3.util.Pair;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.NodeStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.apache.nifi.controller.status.history.questdb.QuestDbDatabaseManager;
+import org.apache.nifi.controller.status.history.storage.BufferedWriterFlushWorker;
+import org.apache.nifi.controller.status.history.storage.BufferedWriterForStatusStorage;
+import org.apache.nifi.controller.status.history.storage.ComponentStatusStorage;
+import org.apache.nifi.controller.status.history.storage.GarbageCollectionStatusStorage;
+import org.apache.nifi.controller.status.history.storage.NodeStatusStorage;
+import org.apache.nifi.controller.status.history.storage.ProcessorStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbConnectionStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbGarbageCollectionStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbNodeStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbProcessGroupStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbProcessorStatusStorage;
+import org.apache.nifi.controller.status.history.storage.QuestDbRemoteProcessGroupStatusStorage;
+import org.apache.nifi.util.NiFiProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class EmbeddedQuestDbStatusHistoryRepository implements StatusHistoryRepository {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedQuestDbStatusHistoryRepository.class);
+    private static final int PERSIST_BATCH_SIZE = 1000;
+    private static final long PERSIST_FREQUENCY = TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS);
+    private static final long ROLL_FREQUENCY = TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
+
+    private final InMemoryComponentDetailsStorage componentDetailsProvider = new InMemoryComponentDetailsStorage();
+    private final ScheduledExecutorService scheduledExecutorService = Executors
+            .newScheduledThreadPool(3, new BasicThreadFactory.Builder().namingPattern("EmbeddedQuestDbStatusHistoryRepositoryWorker-%d").build());
+
+    private final QuestDbContext dbContext;
+    private final long persistFrequency;
+    private final int daysToKeepNodeData;
+    private final int daysToKeepComponentData;
+
+    private final ProcessorStatusStorage processorStatusStorage;
+    private final ComponentStatusStorage<ConnectionStatus> connectionStatusStorage;
+    private final ComponentStatusStorage<ProcessGroupStatus> processGroupStatusStorage;
+    private final ComponentStatusStorage<RemoteProcessGroupStatus> remoteProcessGroupStatusStorage;
+    private final NodeStatusStorage nodeStatusStorage;
+    private final GarbageCollectionStatusStorage garbageCollectionStatusStorage;
+
+    private final BufferedWriterForStatusStorage<ProcessorStatus> processorStatusWriter;
+    private final BufferedWriterForStatusStorage<ConnectionStatus> connectionStatusWriter;
+    private final BufferedWriterForStatusStorage<ProcessGroupStatus> processGroupStatusWriter;
+    private final BufferedWriterForStatusStorage<RemoteProcessGroupStatus> remoteProcessGroupStatusWriter;
+    private final BufferedWriterForStatusStorage<NodeStatus> nodeStatusWriter;
+    private final BufferedWriterForStatusStorage<GarbageCollectionStatus> garbageCollectionStatusWriter;
+
+    /**
+     * Default no args constructor for service loading only
+     */
+    public EmbeddedQuestDbStatusHistoryRepository() {
+        dbContext = null;
+        persistFrequency = PERSIST_FREQUENCY;
+        daysToKeepNodeData = -1;
+        daysToKeepComponentData = -1;
+
+        processorStatusStorage = null;
+        connectionStatusStorage = null;
+        processGroupStatusStorage = null;
+        remoteProcessGroupStatusStorage = null;
+        nodeStatusStorage = null;
+        garbageCollectionStatusStorage = null;
+
+        processorStatusWriter = null;
+        connectionStatusWriter = null;
+        processGroupStatusWriter = null;
+        remoteProcessGroupStatusWriter = null;
+        nodeStatusWriter = null;
+        garbageCollectionStatusWriter = null;
+    }
+
+    public EmbeddedQuestDbStatusHistoryRepository(final NiFiProperties niFiProperties) {
+        this(niFiProperties, PERSIST_FREQUENCY);
+    }
+
+    EmbeddedQuestDbStatusHistoryRepository(final NiFiProperties niFiProperties, final long persistFrequency) {
+        final Path persistLocation = niFiProperties.getQuestDbStatusRepositoryPath();
+        final CairoConfiguration configuration = new DefaultCairoConfiguration(persistLocation.toString());
+        QuestDbDatabaseManager.checkDatabaseStatus(persistLocation);
+
+        this.persistFrequency = persistFrequency;
+        daysToKeepNodeData = getDaysToKeepNodeData(niFiProperties);
+        daysToKeepComponentData = getDaysToKeepComponentData(niFiProperties);
+        dbContext = new QuestDbContext(new CairoEngine(configuration), new MessageBusImpl());
+
+        nodeStatusStorage = new QuestDbNodeStatusStorage(dbContext);
+        garbageCollectionStatusStorage = new QuestDbGarbageCollectionStatusStorage(dbContext);
+        processorStatusStorage = new QuestDbProcessorStatusStorage(dbContext, componentDetailsProvider);
+        connectionStatusStorage = new QuestDbConnectionStatusStorage(dbContext, componentDetailsProvider);
+        processGroupStatusStorage = new QuestDbProcessGroupStatusStorage(dbContext, componentDetailsProvider);
+        remoteProcessGroupStatusStorage = new QuestDbRemoteProcessGroupStatusStorage(dbContext, componentDetailsProvider);
+
+        nodeStatusWriter = new BufferedWriterForStatusStorage<>(nodeStatusStorage, PERSIST_BATCH_SIZE);
+        garbageCollectionStatusWriter = new BufferedWriterForStatusStorage<>(garbageCollectionStatusStorage, PERSIST_BATCH_SIZE);
+        processorStatusWriter = new BufferedWriterForStatusStorage<>(processorStatusStorage, PERSIST_BATCH_SIZE);
+        connectionStatusWriter = new BufferedWriterForStatusStorage<>(connectionStatusStorage, PERSIST_BATCH_SIZE);
+        processGroupStatusWriter = new BufferedWriterForStatusStorage<>(processGroupStatusStorage, PERSIST_BATCH_SIZE);
+        remoteProcessGroupStatusWriter = new BufferedWriterForStatusStorage<>(remoteProcessGroupStatusStorage, PERSIST_BATCH_SIZE);
+    }
+
+    @Override
+    public void start() {
+        LOGGER.debug("Starting status history repository");
+
+        final EmbeddedQuestDbRolloverHandler nodeRolloverHandler = new EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getNodeTableNames(), daysToKeepNodeData, dbContext);
+        final EmbeddedQuestDbRolloverHandler componentRolloverHandler = new EmbeddedQuestDbRolloverHandler(QuestDbDatabaseManager.getComponentTableNames(), daysToKeepComponentData, dbContext);
+        final BufferedWriterFlushWorker writer = new BufferedWriterFlushWorker(Arrays.asList(
+            nodeStatusWriter,
+            garbageCollectionStatusWriter,
+            processorStatusWriter,
+            connectionStatusWriter,
+            processGroupStatusWriter,
+            remoteProcessGroupStatusWriter
+        ));
+
+        scheduledExecutorService.scheduleWithFixedDelay(nodeRolloverHandler, 0, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(componentRolloverHandler, 0, ROLL_FREQUENCY, TimeUnit.MILLISECONDS);
+        scheduledExecutorService.scheduleWithFixedDelay(writer, 0, persistFrequency, TimeUnit.MILLISECONDS);
+
+        LOGGER.debug("Status history repository is started");
+    }
+
+    @Override
+    public void shutdown() {
+        LOGGER.debug("Component status repository is shutting down");

Review comment:
       Minor: not component specific.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/EmbeddedQuestDbRolloverHandler.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history;
+
+import io.questdb.cairo.sql.Record;
+import io.questdb.cairo.sql.RecordCursor;
+import io.questdb.cairo.sql.RecordCursorFactory;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.controller.status.history.questdb.QuestDbContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * QuestDB does not provide the possibility for deleting individual lines. Instead there is the option to drop older
+ * partitions. In order to clean up older status information, the partitions are outside of the scope of data we intend
+ * to keep will be deleted.
+ */
+public class EmbeddedQuestDbRolloverHandler implements Runnable {

Review comment:
       Not sure but it is rather "purge" than "rollover" what this class is doing.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/BufferedWriterFlushWorker.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BufferedWriterFlushWorker implements Runnable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(BufferedWriterFlushWorker.class);
+
+    private final List<BufferedWriter<?>> bufferedWriterList = new ArrayList<>();
+
+    public BufferedWriterFlushWorker(final List<BufferedWriter<?>> bufferedWriterList) {
+        this.bufferedWriterList.addAll(bufferedWriterList);
+    }
+
+    @Override
+    public void run() {
+        try {
+            bufferedWriterList.forEach(statusRepository -> statusRepository.flush());

Review comment:
       Minor: it is not a `statusRepository` specific function.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
##########
@@ -120,11 +120,18 @@ nifi.provenance.repository.concurrent.merge.threads=${nifi.provenance.repository
 # Volatile Provenance Respository Properties
 nifi.provenance.repository.buffer.size=${nifi.provenance.repository.buffer.size}
 
-# Component Status Repository
+# Component and Node Status History Repository
 nifi.components.status.repository.implementation=${nifi.components.status.repository.implementation}
+
+# Volatile Status History Repository Properties
 nifi.components.status.repository.buffer.size=${nifi.components.status.repository.buffer.size}
 nifi.components.status.snapshot.frequency=${nifi.components.status.snapshot.frequency}
 
+# QuestDB Status History Repository Properties
+# nifi.status.repository.questdb.persist.node.days=${nifi.status.repository.questdb.persist.node.days}
+# nifi.status.repository.questdb.persist.component.days=${nifi.status.repository.questdb.persist.component.days}
+# nifi.status.repository.questdb.persist.location=${nifi.status.repository.questdb.persist.location}

Review comment:
       I would consider to add these properties uncommented. In that way it would be clear which values are in effect. 

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/questdb/QuestDbDatabaseManager.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.questdb;
+
+import io.questdb.MessageBusImpl;
+import io.questdb.cairo.CairoConfiguration;
+import io.questdb.cairo.CairoEngine;
+import io.questdb.cairo.DefaultCairoConfiguration;
+import io.questdb.griffin.SqlCompiler;
+import io.questdb.griffin.SqlExecutionContext;
+import io.questdb.griffin.SqlExecutionContextImpl;
+import org.apache.nifi.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * The database manager is responsible for checking and maintaining the health of the database during startup.
+ */
+public final class QuestDbDatabaseManager {
+    private enum DatabaseStatus {
+        HEALTHY, NON_EXISTING, CORRUPTED;
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(QuestDbQueries.class);

Review comment:
       Wrong class for logger.

##########
File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/storage/ComponentCounterWritingTemplate.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.history.storage;
+
+import io.questdb.cairo.TableWriter;
+import org.apache.commons.math3.util.Pair;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.history.questdb.QuestDbWritingTemplate;
+
+import java.util.Collection;
+import java.util.Date;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+class ComponentCounterWritingTemplate extends QuestDbWritingTemplate<Pair<Date, ProcessorStatus>> {
+
+    public ComponentCounterWritingTemplate() {
+        super("componentCounter");
+    }
+
+    @Override
+    protected void addRows(final TableWriter tableWriter, final Collection<Pair<Date, ProcessorStatus>> entries) {
+        for (final Pair<Date, ProcessorStatus> entry : entries) {
+            final Map<String, Long> counters = entry.getSecond().getCounters();
+
+            if (counters != null && counters.size() > 0) {
+                for (final Map.Entry<String, Long> counter : counters.entrySet()) {
+                    final long measuredAt = TimeUnit.MILLISECONDS.toMicros(entry.getFirst().getTime());
+                    final TableWriter.Row counterRow = tableWriter.newRow(measuredAt);
+                    counterRow.putSym(1,  entry.getSecond().getId());
+                    counterRow.putSym(2,  counter.getKey());
+                    counterRow.putLong(3,  counter.getValue());

Review comment:
       Minor formatting: double spaces.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org