You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2020/09/03 14:22:16 UTC
[nifi] branch main updated: NIFI-7557: uses a canonical
representation of strings when recovering data from FlowFile Repository in
order to avoid using huge amounts of heap when not necessary - Fixed some
problems with unit/integration tests
This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new fd068fe NIFI-7557: uses a canonical representation of strings when recovering data from FlowFile Repository in order to avoid using huge amounts of heap when not necessary - Fixed some problems with unit/integration tests
fd068fe is described below
commit fd068fe978da5a4a3e5df14b9a8e43f51e88c606
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Jun 18 13:25:08 2020 -0400
NIFI-7557: uses a canonical representation of strings when recovering data from FlowFile Repository in order to avoid using huge amounts of heap when not necessary
- Fixed some problems with unit/integration tests
This closes #4507.
Signed-off-by: Bryan Bende <bb...@apache.org>
---
.../apache/nifi/repository/schema/FieldCache.java | 55 ++++++++++++++
.../nifi/repository/schema/NoOpFieldCache.java | 29 +++++++
.../nifi/repository/schema/SchemaRecordReader.java | 14 ++--
.../repository/schema/TestSchemaRecordReader.java | 4 +-
.../schema/TestSchemaRecordReaderWriter.java | 4 +-
.../wali/.SequentialAccessWriteAheadLog.java.swp | Bin 16384 -> 0 bytes
.../nifi-flowfile-repo-serialization/pom.xml | 32 ++++----
.../controller/repository/CaffeineFieldCache.java | 43 +++++++++++
.../EncryptedRepositoryRecordSerdeFactory.java | 7 +-
.../repository/SchemaRepositoryRecordSerde.java | 7 +-
.../StandardRepositoryRecordSerdeFactory.java | 10 ++-
.../nifi/controller/FileSystemSwapManager.java | 6 +-
.../org/apache/nifi/controller/FlowController.java | 83 +++++++++------------
.../repository/WriteAheadFlowFileRepository.java | 39 ++++++++--
.../controller/swap/SchemaSwapDeserializer.java | 15 +++-
.../apache/nifi/encrypt/StringEncryptorIT.groovy | 12 ---
...cryptedSequentialAccessWriteAheadLogTest.groovy | 5 +-
.../SchemaRepositoryRecordSerdeTest.java | 3 +-
.../OOMEWriteAheadFlowFileRepository.java | 5 +-
.../provenance/ByteArraySchemaRecordReader.java | 3 +-
.../provenance/EventIdFirstSchemaRecordReader.java | 5 +-
21 files changed, 275 insertions(+), 106 deletions(-)
diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldCache.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldCache.java
new file mode 100644
index 0000000..9186a51
--- /dev/null
+++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/FieldCache.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.repository.schema;
+
+/**
+ * <p>
+ * For many write-ahead logs, the keys and values that are stored for fields are very repetitive. As a result, it is common when reading data from the repository
+ * to read the same value over and over, but each time the value is read, the data is a separate object in memory.
+ * </p>
+ *
+ * <p>
+ * Take, for example, the case when the value "Hello World" is stored as a field in nearly every record that is written to the WAL. When the record is created,
+ * it may be created in such a way that a single String is referenced over and over again. However, when the Write-Ahead Log is restored, each time that value is encountered,
+ * a new String must be created because it is being deserialized from an InputStream. So instead of a single String occupying approximately 25 bytes of heap, if this is encountered
+ * 1 million times, the result is that 1 million 25-byte Strings remain on the heap, totaling about 25 MB of heap space.
+ * </p>
+ *
+ * <p>
+ * In order to avoid this, a FieldValueCache can be provided to the SerDe. As a result, whenever a value is read, that value is added to a cache as the "canonical representation" of that
+ * value. The next time that value is encountered, if the first instance is still available in the cache, the canonical representation will be returned. As a result, we end up creating the
+ * first String with the value of "Hello World" and then the second instance. The second instance is then used to lookup the canonical representation (the first instance) and the canonical
+ * representation is then included in the record. The second instance is then garbage collected. As a result, even with millions of records having the value "Hello World" only a single
+ * instance needs to be kept in heap.
+ * </p>
+ */
+public interface FieldCache {
+
+ /**
+ * Check if the given value already exists in the cache and if so returns it. If the value does not
+ * already exist in the cache, adds the given value to the cache, evicting an existing entry(ies) if necessary
+ * @param value the value to cache
+ * @return the canonical representation of the value that should be used
+ */
+ String cache(String value);
+
+ /**
+ * Clears the cache
+ */
+ void clear();
+}
diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/NoOpFieldCache.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/NoOpFieldCache.java
new file mode 100644
index 0000000..662f8ca
--- /dev/null
+++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/NoOpFieldCache.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.repository.schema;
+
+public class NoOpFieldCache implements FieldCache {
+ @Override
+ public String cache(final String value) {
+ return value;
+ }
+
+ @Override
+ public void clear() {
+ }
+}
diff --git a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
index daedf37..68f116b 100644
--- a/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
+++ b/nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
@@ -35,13 +35,15 @@ import java.util.Optional;
public class SchemaRecordReader {
private final RecordSchema schema;
+ private final FieldCache fieldCache;
- public SchemaRecordReader(final RecordSchema schema) {
+ private SchemaRecordReader(final RecordSchema schema, final FieldCache fieldCache) {
this.schema = schema;
+ this.fieldCache = fieldCache;
}
- public static SchemaRecordReader fromSchema(final RecordSchema schema) {
- return new SchemaRecordReader(schema);
+ public static SchemaRecordReader fromSchema(final RecordSchema schema, final FieldCache fieldCache) {
+ return new SchemaRecordReader(schema, fieldCache);
}
private static void fillBuffer(final InputStream in, final byte[] destination) throws IOException {
@@ -194,13 +196,15 @@ public class SchemaRecordReader {
}
case STRING: {
final DataInputStream dis = new DataInputStream(in);
- return dis.readUTF();
+ final String value = dis.readUTF();
+ return fieldCache.cache(value);
}
case LONG_STRING: {
final int length = readInt(in);
final byte[] buffer = new byte[length];
fillBuffer(in, buffer);
- return new String(buffer, StandardCharsets.UTF_8);
+ final String value = new String(buffer, StandardCharsets.UTF_8);
+ return fieldCache.cache(value);
}
case BYTE_ARRAY: {
final int length = readInt(in);
diff --git a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReader.java b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReader.java
index 6099814..d5a4cfd 100644
--- a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReader.java
+++ b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReader.java
@@ -63,7 +63,7 @@ public class TestSchemaRecordReader {
})));
final RecordSchema schema = new RecordSchema(fields);
- final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema);
+ final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache());
final byte[] buffer;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -167,7 +167,7 @@ public class TestSchemaRecordReader {
final RecordSchema schema = new RecordSchema(fields);
- final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema);
+ final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache());
// for each field, make the first one missing and the second one present.
final byte[] buffer;
diff --git a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
index 5dfd40e..5dc5cd8 100644
--- a/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
+++ b/nifi-commons/nifi-schema-utils/src/test/java/org/apache/nifi/repository/schema/TestSchemaRecordReaderWriter.java
@@ -142,7 +142,7 @@ public class TestSchemaRecordReaderWriter {
try (final InputStream in = new ByteArrayInputStream(baos.toByteArray())) {
// Read the Schema from the stream and create a Record Reader for reading records, based on this schema
final RecordSchema readSchema = RecordSchema.readFrom(in);
- final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema);
+ final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema, new NoOpFieldCache());
// Read two records and verify the values.
for (int i=0; i < 2; i++) {
@@ -216,7 +216,7 @@ public class TestSchemaRecordReaderWriter {
try (final InputStream in = new ByteArrayInputStream(baos.toByteArray())) {
// Read the Schema from the stream and create a Record Reader for reading records, based on this schema
final RecordSchema readSchema = RecordSchema.readFrom(in);
- final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema);
+ final SchemaRecordReader reader = SchemaRecordReader.fromSchema(readSchema, new NoOpFieldCache());
// Read the records and verify the values.
for (int i=0; i < 2; i++) {
diff --git a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp b/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp
deleted file mode 100644
index 1d5f641..0000000
Binary files a/nifi-commons/nifi-write-ahead-log/src/main/java/org/apache/nifi/wali/.SequentialAccessWriteAheadLog.java.swp and /dev/null differ
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
index bafa8dd..6a25488 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml
@@ -1,13 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
- license agreements. See the NOTICE file distributed with this work for additional
- information regarding copyright ownership. The ASF licenses this file to
- You under the Apache License, Version 2.0 (the "License"); you may not use
- this file except in compliance with the License. You may obtain a copy of
- the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
- by applicable law or agreed to in writing, software distributed under the
- License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
- OF ANY KIND, either express or implied. See the License for the specific
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -47,10 +47,16 @@
<artifactId>nifi-security-utils</artifactId>
</dependency>
<dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ <version>2.8.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-properties-loader</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
@@ -61,9 +67,5 @@
<version>${nifi.groovy.version}</version>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-properties-loader</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/CaffeineFieldCache.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/CaffeineFieldCache.java
new file mode 100644
index 0000000..22874da
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/CaffeineFieldCache.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.nifi.repository.schema.FieldCache;
+
+public class CaffeineFieldCache implements FieldCache {
+ private final Cache<String, String> cache;
+
+ public CaffeineFieldCache(final long maxCharacters) {
+ cache = Caffeine.newBuilder()
+ .maximumWeight(maxCharacters)
+ .weigher((k, v) -> ((String) k).length())
+ .build();
+ }
+
+ @Override
+ public String cache(final String value) {
+ return cache.get(value, k -> value);
+ }
+
+ @Override
+ public void clear() {
+ cache.invalidateAll();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java
index 5b28177..714fc7f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/EncryptedRepositoryRecordSerdeFactory.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
import java.io.IOException;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.FieldCache;
import org.apache.nifi.security.kms.CryptoUtils;
import org.apache.nifi.security.kms.EncryptionException;
import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration;
@@ -32,8 +33,8 @@ public class EncryptedRepositoryRecordSerdeFactory extends StandardRepositoryRec
private FlowFileRepositoryEncryptionConfiguration ffrec;
- public EncryptedRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager, NiFiProperties niFiProperties) throws EncryptionException {
- super(claimManager);
+ public EncryptedRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager, final NiFiProperties niFiProperties, final FieldCache fieldCache) throws EncryptionException {
+ super(claimManager, fieldCache);
// Retrieve encryption configuration
FlowFileRepositoryEncryptionConfiguration ffrec = new FlowFileRepositoryEncryptionConfiguration(niFiProperties);
@@ -48,7 +49,7 @@ public class EncryptedRepositoryRecordSerdeFactory extends StandardRepositoryRec
}
@Override
- public SerDe<SerializedRepositoryRecord> createSerDe(String encodingName) {
+ public SerDe<SerializedRepositoryRecord> createSerDe(final String encodingName) {
// If no encoding is provided, use the encrypted as the default
if (encodingName == null || EncryptedSchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
// Delegate the creation of the wrapped serde to the standard factory
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
index ee87f1b..12933f9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -25,6 +25,7 @@ import org.apache.nifi.controller.repository.schema.FlowFileSchema;
import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap;
import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
+import org.apache.nifi.repository.schema.FieldCache;
import org.apache.nifi.repository.schema.FieldType;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.repository.schema.RecordIterator;
@@ -49,11 +50,13 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
private final ResourceClaimManager resourceClaimManager;
+ private final FieldCache fieldCache;
private volatile SchemaRecordReader reader;
private RecordIterator recordIterator = null;
- public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) {
+ public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager, final FieldCache fieldCache) {
this.resourceClaimManager = resourceClaimManager;
+ this.fieldCache = fieldCache;
}
@Override
@@ -101,7 +104,7 @@ public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implement
@Override
public void readHeader(final DataInputStream in) throws IOException {
final RecordSchema recoverySchema = RecordSchema.readFrom(in);
- reader = SchemaRecordReader.fromSchema(recoverySchema);
+ reader = SchemaRecordReader.fromSchema(recoverySchema, fieldCache);
}
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java
index ac1989c..884105a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecordSerdeFactory.java
@@ -18,21 +18,29 @@
package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.FieldCache;
+import org.apache.nifi.repository.schema.NoOpFieldCache;
import org.wali.SerDe;
import org.wali.UpdateType;
public class StandardRepositoryRecordSerdeFactory implements RepositoryRecordSerdeFactory {
private static final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde";
private final ResourceClaimManager resourceClaimManager;
+ private final FieldCache fieldCache;
public StandardRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) {
+ this(claimManager, new NoOpFieldCache());
+ }
+
+ public StandardRepositoryRecordSerdeFactory(final ResourceClaimManager claimManager, final FieldCache fieldCache) {
this.resourceClaimManager = claimManager;
+ this.fieldCache = fieldCache;
}
@Override
public SerDe<SerializedRepositoryRecord> createSerDe(final String encodingName) {
if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) {
- final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager);
+ final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager, fieldCache);
return serde;
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 01cd272..8f0caa8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.CaffeineFieldCache;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
@@ -35,6 +36,7 @@ import org.apache.nifi.controller.swap.SwapDeserializer;
import org.apache.nifi.controller.swap.SwapSerializer;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.repository.schema.FieldCache;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
@@ -78,11 +80,11 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap");
private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+?(\\..*?)?\\.swap\\.part");
- public static final int SWAP_ENCODING_VERSION = 10;
public static final String EVENT_CATEGORY = "Swap FlowFiles";
private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class);
private final File storageDirectory;
+ private final FieldCache fieldCache = new CaffeineFieldCache(10_000_000);
// effectively final
private FlowFileRepository flowFileRepository;
@@ -372,7 +374,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
if (Arrays.equals(magicHeader, MAGIC_HEADER)) {
final String serializationName = dis.readUTF();
if (serializationName.equals(SchemaSwapDeserializer.getSerializationName())) {
- return new SchemaSwapDeserializer();
+ return new SchemaSwapDeserializer(fieldCache);
}
throw new IOException("Cannot find a suitable Deserializer for swap file, written with Serialization Name '" + serializationName + "'");
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 500cb15..130a476 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,38 +16,6 @@
*/
package org.apache.nifi.controller;
-import static java.util.Objects.requireNonNull;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.management.GarbageCollectorMXBean;
-import java.lang.management.ManagementFactory;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import javax.management.NotificationEmitter;
-import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@@ -114,7 +82,6 @@ import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
-import org.apache.nifi.controller.repository.EncryptedRepositoryRecordSerdeFactory;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
@@ -127,7 +94,6 @@ import org.apache.nifi.controller.repository.StandardQueueProvider;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
-import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -218,13 +184,45 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.concurrency.TimedLock;
-import org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.management.NotificationEmitter;
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.management.GarbageCollectorMXBean;
+import java.lang.management.ManagementFactory;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+
public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
// default repository implementations
@@ -787,18 +785,11 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
try {
- final FlowFileRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName,
- FlowFileRepository.class, properties);
- if (EncryptedSequentialAccessWriteAheadLog.class.getName().equals(properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION))
- && created instanceof WriteAheadFlowFileRepository) {
- synchronized (created) {
- ((WriteAheadFlowFileRepository) created).initialize(contentClaimManager, new EncryptedRepositoryRecordSerdeFactory(contentClaimManager, properties));
- }
- } else {
- synchronized (created) {
- created.initialize(contentClaimManager);
- }
+ final FlowFileRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileRepository.class, properties);
+ synchronized (created) {
+ created.initialize(contentClaimManager);
}
+
return created;
} catch (final Exception e) {
throw new RuntimeException(e);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 432e4e4..f069de6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -22,8 +22,11 @@ import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.repository.schema.FieldCache;
+import org.apache.nifi.security.kms.EncryptionException;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog;
import org.apache.nifi.wali.SequentialAccessWriteAheadLog;
import org.apache.nifi.wali.SnapshotCapture;
import org.slf4j.Logger;
@@ -86,26 +89,29 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = "nifi.flowfile.repository.directory";
private static final String WRITE_AHEAD_LOG_IMPL = "nifi.flowfile.repository.wal.implementation";
private static final String RETAIN_ORPHANED_FLOWFILES = "nifi.flowfile.repository.retain.orphaned.flowfiles";
+ private static final String FLOWFILE_REPO_CACHE_SIZE = "nifi.flowfile.repository.wal.cache.characters";
static final String SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.SequentialAccessWriteAheadLog";
static final String ENCRYPTED_SEQUENTIAL_ACCESS_WAL = "org.apache.nifi.wali.EncryptedSequentialAccessWriteAheadLog";
private static final String MINIMAL_LOCKING_WALI = "org.wali.MinimalLockingWriteAheadLog";
private static final String DEFAULT_WAL_IMPLEMENTATION = SEQUENTIAL_ACCESS_WAL;
+ private static final int DEFAULT_CACHE_SIZE = 10_000_000;
- final String walImplementation;
+ private final String walImplementation;
protected final NiFiProperties nifiProperties;
- final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
+ private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
private final boolean alwaysSync;
private final boolean retainOrphanedFlowFiles;
private static final Logger logger = LoggerFactory.getLogger(WriteAheadFlowFileRepository.class);
volatile ScheduledFuture<?> checkpointFuture;
- final long checkpointDelayMillis;
+ private final long checkpointDelayMillis;
private final List<File> flowFileRepositoryPaths = new ArrayList<>();
- final List<File> recoveryFiles = new ArrayList<>();
- final ScheduledExecutorService checkpointExecutor;
+ private final List<File> recoveryFiles = new ArrayList<>();
+ private final ScheduledExecutorService checkpointExecutor;
+ private final int maxCharactersToCache;
private volatile Collection<SerializedRepositoryRecord> recoveredRecords = null;
private final Set<ResourceClaim> orphanedResourceClaims = Collections.synchronizedSet(new HashSet<>());
@@ -116,6 +122,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
private WriteAheadRepository<SerializedRepositoryRecord> wal;
private RepositoryRecordSerdeFactory serdeFactory;
private ResourceClaimManager claimManager;
+ private FieldCache fieldCache;
// WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk.
// We keep track of this because we need to ensure that the ContentClaims are destroyed only after the FlowFile Repository has been
@@ -150,6 +157,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
walImplementation = null;
nifiProperties = null;
retainOrphanedFlowFiles = true;
+ maxCharactersToCache = 0;
}
public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
@@ -165,6 +173,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
writeAheadLogImpl = DEFAULT_WAL_IMPLEMENTATION;
}
this.walImplementation = writeAheadLogImpl;
+ this.maxCharactersToCache = nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
// We used to use one implementation (minimal locking) of the write-ahead log, but we now want to use the other
// (sequential access), we must address this. Since the MinimalLockingWriteAheadLog supports multiple partitions,
@@ -202,11 +211,25 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
@Override
public void initialize(final ResourceClaimManager claimManager) throws IOException {
- initialize(claimManager, new StandardRepositoryRecordSerdeFactory(claimManager));
+ final FieldCache fieldCache = new CaffeineFieldCache(maxCharactersToCache);
+ initialize(claimManager, createSerdeFactory(claimManager, fieldCache), fieldCache);
}
- public void initialize(final ResourceClaimManager claimManager, final RepositoryRecordSerdeFactory serdeFactory) throws IOException {
+ protected RepositoryRecordSerdeFactory createSerdeFactory(final ResourceClaimManager claimManager, final FieldCache fieldCache) {
+ if (EncryptedSequentialAccessWriteAheadLog.class.getName().equals(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_WAL_IMPLEMENTATION))) {
+ try {
+ return new EncryptedRepositoryRecordSerdeFactory(claimManager, nifiProperties, fieldCache);
+ } catch (final EncryptionException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ return new StandardRepositoryRecordSerdeFactory(claimManager, fieldCache);
+ }
+ }
+
+ public void initialize(final ResourceClaimManager claimManager, final RepositoryRecordSerdeFactory serdeFactory, final FieldCache fieldCache) throws IOException {
this.claimManager = claimManager;
+ this.fieldCache = fieldCache;
for (final File file : flowFileRepositoryPaths) {
Files.createDirectories(file.toPath());
@@ -868,6 +891,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
}
}
+ fieldCache.clear();
+
final Map<String, FlowFileQueue> queueMap = new HashMap<>();
for (final FlowFileQueue queue : queueProvider.getAllQueues()) {
queueMap.put(queue.getIdentifier(), queue);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
index 88e1415..f184a66 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/swap/SchemaSwapDeserializer.java
@@ -30,6 +30,8 @@ import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.schema.FlowFileRecordFieldMap;
import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldCache;
+import org.apache.nifi.repository.schema.NoOpFieldCache;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.repository.schema.RecordField;
import org.apache.nifi.repository.schema.RecordSchema;
@@ -37,12 +39,21 @@ import org.apache.nifi.repository.schema.Repetition;
import org.apache.nifi.repository.schema.SchemaRecordReader;
public class SchemaSwapDeserializer implements SwapDeserializer {
+ private final FieldCache fieldCache;
+
+ public SchemaSwapDeserializer() {
+ this(new NoOpFieldCache());
+ }
+
+ public SchemaSwapDeserializer(final FieldCache fieldCache) {
+ this.fieldCache = fieldCache;
+ }
@Override
@SuppressWarnings("unchecked")
public SwapContents deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
final RecordSchema schema = RecordSchema.readFrom(in);
- final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema);
+ final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema, fieldCache);
final Record parentRecord = reader.readRecord(in);
final List<Record> flowFileRecords = (List<Record>) parentRecord.getFieldValue(SwapSchema.FLOWFILE_CONTENTS);
@@ -65,7 +76,7 @@ public class SchemaSwapDeserializer implements SwapDeserializer {
final RecordField summaryRecordField = new ComplexRecordField(SwapSchema.SWAP_SUMMARY, Repetition.EXACTLY_ONE, summaryFields);
final RecordSchema summarySchema = new RecordSchema(Collections.singletonList(summaryRecordField));
- final Record summaryRecordParent = SchemaRecordReader.fromSchema(summarySchema).readRecord(in);
+ final Record summaryRecordParent = SchemaRecordReader.fromSchema(summarySchema, fieldCache).readRecord(in);
final Record summaryRecord = (Record) summaryRecordParent.getFieldValue(SwapSchema.SWAP_SUMMARY);
final SwapSummary swapSummary = SwapSummaryFieldMap.getSwapSummary(summaryRecord, claimManager);
return swapSummary;
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/encrypt/StringEncryptorIT.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/encrypt/StringEncryptorIT.groovy
index 8e4428c..b5477da 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/encrypt/StringEncryptorIT.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/encrypt/StringEncryptorIT.groovy
@@ -71,9 +71,6 @@ class StringEncryptorIT {
final String PASSWORD = "nifiPassword123"
final String plaintext = "some sensitive flow value"
- final long SLOW_DURATION_NANOS = 500 * 1000 * 1000 // 500 ms
- final long FAST_DURATION_NANOS = 1 * 1000 * 1000 // 1 ms
-
int testIterations = 100 //_000
def results = []
@@ -111,14 +108,5 @@ class StringEncryptorIT {
def milliDurations = [resultDurations.min(), resultDurations.max(), resultDurations.sum() / resultDurations.size()].collect { it / 1_000_000 }
logger.info("Min/Max/Avg durations in ms: ${milliDurations}")
-
- // Assert
-
- // The initial creation (including key derivation) should be slow
- assert createNanos > SLOW_DURATION_NANOS
-
- // The encryption/decryption process (repeated) should be fast
- assert resultDurations.max() <= FAST_DURATION_NANOS * 3
- assert resultDurations.sum() / testIterations < FAST_DURATION_NANOS
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
index 8bc91db..03faec6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/wali/EncryptedSequentialAccessWriteAheadLogTest.groovy
@@ -31,6 +31,7 @@ import org.apache.nifi.controller.repository.StandardRepositoryRecord
import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory
import org.apache.nifi.controller.repository.claim.ResourceClaimManager
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager
+import org.apache.nifi.repository.schema.NoOpFieldCache
import org.apache.nifi.security.kms.CryptoUtils
import org.apache.nifi.security.repository.config.FlowFileRepositoryEncryptionConfiguration
import org.bouncycastle.jce.provider.BouncyCastleProvider
@@ -105,7 +106,7 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase {
flowFileQueue = createAndRegisterMockQueue(TEST_QUEUE_IDENTIFIER)
byteArrayOutputStream = new ByteArrayOutputStream()
dataOutputStream = new DataOutputStream(byteArrayOutputStream)
- wrappedSerDe = new SchemaRepositoryRecordSerde(claimManager)
+ wrappedSerDe = new SchemaRepositoryRecordSerde(claimManager, new NoOpFieldCache())
flowFileREC = new FlowFileRepositoryEncryptionConfiguration(KPI, KPL, KEY_ID, KEYS, REPO_IMPL)
@@ -192,7 +193,7 @@ class EncryptedSequentialAccessWriteAheadLogTest extends GroovyTestCase {
testLogger.setLevel(Level.INFO)
final List<SerializedRepositoryRecord> records = new ArrayList<>()
- 100_000.times { int i ->
+ 10_000.times { int i ->
def attributes = [name: "User ${i}" as String, age: "${i}" as String]
final SerializedRepositoryRecord record = buildCreateRecord(flowFileQueue, attributes)
records.add(record)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
index c306db1..543d94b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java
@@ -20,6 +20,7 @@ package org.apache.nifi.controller.repository;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
+import org.apache.nifi.repository.schema.NoOpFieldCache;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -49,7 +50,7 @@ public class SchemaRepositoryRecordSerdeTest {
@Before
public void setup() {
resourceClaimManager = new StandardResourceClaimManager();
- schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager);
+ schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager, new NoOpFieldCache());
flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER);
byteArrayOutputStream = new ByteArrayOutputStream();
dataOutputStream = new DataOutputStream(byteArrayOutputStream);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/flowfilerepo/OOMEWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/flowfilerepo/OOMEWriteAheadFlowFileRepository.java
index 320db25..2b55700 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/flowfilerepo/OOMEWriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/flowfilerepo/OOMEWriteAheadFlowFileRepository.java
@@ -21,6 +21,8 @@ import org.apache.nifi.controller.repository.SerializedRepositoryRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecordSerdeFactory;
import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.repository.schema.FieldCache;
+import org.apache.nifi.repository.schema.NoOpFieldCache;
import org.apache.nifi.util.NiFiProperties;
import org.wali.SerDe;
import org.wali.UpdateType;
@@ -41,7 +43,8 @@ public class OOMEWriteAheadFlowFileRepository extends WriteAheadFlowFileReposito
@Override
public void initialize(final ResourceClaimManager claimManager) throws IOException {
- super.initialize(claimManager, new ThrowOOMERepositoryRecordSerdeFactory(new StandardRepositoryRecordSerdeFactory(claimManager)));
+ final FieldCache fieldCache = new NoOpFieldCache();
+ super.initialize(claimManager, new ThrowOOMERepositoryRecordSerdeFactory(new StandardRepositoryRecordSerdeFactory(claimManager, fieldCache)), fieldCache);
}
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
index bb43ba8..d1e5b3c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordReader.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
import org.apache.nifi.provenance.schema.EventRecord;
import org.apache.nifi.provenance.serialization.CompressableRecordReader;
import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.repository.schema.NoOpFieldCache;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.repository.schema.RecordSchema;
import org.apache.nifi.repository.schema.SchemaRecordReader;
@@ -62,7 +63,7 @@ public class ByteArraySchemaRecordReader extends CompressableRecordReader {
schema = RecordSchema.readFrom(bais);
}
- recordReader = SchemaRecordReader.fromSchema(schema);
+ recordReader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache());
}
@Override
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java
index bd85846..797402e 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema;
import org.apache.nifi.provenance.schema.LookupTableEventRecord;
import org.apache.nifi.provenance.serialization.CompressableRecordReader;
import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.repository.schema.NoOpFieldCache;
import org.apache.nifi.repository.schema.Record;
import org.apache.nifi.repository.schema.RecordSchema;
import org.apache.nifi.repository.schema.SchemaRecordReader;
@@ -100,7 +101,7 @@ public class EventIdFirstSchemaRecordReader extends CompressableRecordReader {
schema = RecordSchema.readFrom(bais);
}
- recordReader = SchemaRecordReader.fromSchema(schema);
+ recordReader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache());
final int headerSchemaLength = in.readInt();
final byte[] headerSchemaBuffer = new byte[headerSchemaLength];
@@ -111,7 +112,7 @@ public class EventIdFirstSchemaRecordReader extends CompressableRecordReader {
headerSchema = RecordSchema.readFrom(bais);
}
- final SchemaRecordReader headerReader = SchemaRecordReader.fromSchema(headerSchema);
+ final SchemaRecordReader headerReader = SchemaRecordReader.fromSchema(headerSchema, new NoOpFieldCache());
final Record headerRecord = headerReader.readRecord(in);
componentIds = (List<String>) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.COMPONENT_IDS);
componentTypes = (List<String>) headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.COMPONENT_TYPES);