You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/10/13 16:04:00 UTC
[3/5] nifi git commit: NIFI-730: Implemented swapping in and out
on-demand by the FlowFileQueue rather than in a background thread
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
new file mode 100644
index 0000000..66f32d8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.nifi.connectable.Connectable;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestStandardFlowFileQueue {
+ private TestSwapManager swapManager = null;
+ private StandardFlowFileQueue queue = null;
+
+ @Before
+ public void setup() {
+ final Connection connection = Mockito.mock(Connection.class);
+ Mockito.when(connection.getSource()).thenReturn(Mockito.mock(Connectable.class));
+ Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+
+ final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+ swapManager = new TestSwapManager();
+
+ final FlowFileRepository flowFileRepo = Mockito.mock(FlowFileRepository.class);
+ final ProvenanceEventRepository provRepo = Mockito.mock(ProvenanceEventRepository.class);
+ final ResourceClaimManager claimManager = Mockito.mock(ResourceClaimManager.class);
+
+ queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000);
+ TestFlowFile.idGenerator.set(0L);
+ }
+
+
+ @Test
+ public void testSwapOutOccurs() {
+ for (int i = 0; i < 10000; i++) {
+ queue.put(new TestFlowFile());
+ assertEquals(0, swapManager.swapOutCalledCount);
+ assertEquals(i + 1, queue.size().getObjectCount());
+ assertEquals(i + 1, queue.size().getByteCount());
+ }
+
+ for (int i = 0; i < 9999; i++) {
+ queue.put(new TestFlowFile());
+ assertEquals(0, swapManager.swapOutCalledCount);
+ assertEquals(i + 10001, queue.size().getObjectCount());
+ assertEquals(i + 10001, queue.size().getByteCount());
+ }
+
+ queue.put(new TestFlowFile(1000));
+ assertEquals(1, swapManager.swapOutCalledCount);
+ assertEquals(20000, queue.size().getObjectCount());
+ assertEquals(20999, queue.size().getByteCount());
+
+ assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+ }
+
+ @Test
+ public void testLowestPrioritySwappedOutFirst() {
+ final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+ prioritizers.add(new FlowFileSizePrioritizer());
+ queue.setPriorities(prioritizers);
+
+ long maxSize = 20000;
+ for (int i = 1; i <= 20000; i++) {
+ queue.put(new TestFlowFile(maxSize - i));
+ }
+
+ assertEquals(1, swapManager.swapOutCalledCount);
+ assertEquals(20000, queue.size().getObjectCount());
+
+ assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+ final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
+ assertEquals(10000, flowFiles.size());
+ for (int i = 0; i < 10000; i++) {
+ assertEquals(i, flowFiles.get(i).getSize());
+ }
+ }
+
+ @Test
+ public void testSwapIn() {
+ for (int i = 1; i <= 20000; i++) {
+ queue.put(new TestFlowFile());
+ }
+
+ assertEquals(1, swapManager.swappedOut.size());
+ queue.put(new TestFlowFile());
+ assertEquals(1, swapManager.swappedOut.size());
+
+ final Set<FlowFileRecord> exp = new HashSet<>();
+ for (int i = 0; i < 9999; i++) {
+ assertNotNull(queue.poll(exp));
+ }
+
+ assertEquals(0, swapManager.swapInCalledCount);
+ assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+ assertNotNull(queue.poll(exp));
+
+ assertEquals(0, swapManager.swapInCalledCount);
+ assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+
+ assertEquals(1, swapManager.swapOutCalledCount);
+
+ assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
+ assertEquals(1, swapManager.swapInCalledCount);
+ assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+
+ assertTrue(swapManager.swappedOut.isEmpty());
+
+ queue.poll(exp);
+
+ }
+
+
+ private class TestSwapManager implements FlowFileSwapManager {
+ private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
+ int swapOutCalledCount = 0;
+ int swapInCalledCount = 0;
+
+
+ @Override
+ public void initialize(final SwapManagerInitializationContext initializationContext) {
+
+ }
+
+ @Override
+ public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
+ swapOutCalledCount++;
+ final String location = UUID.randomUUID().toString();
+ swappedOut.put(location, new ArrayList<FlowFileRecord>(flowFiles));
+ return location;
+ }
+
+ @Override
+ public List<FlowFileRecord> peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
+ return new ArrayList<FlowFileRecord>(swappedOut.get(swapLocation));
+ }
+
+ @Override
+ public List<FlowFileRecord> swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
+ swapInCalledCount++;
+ return swappedOut.remove(swapLocation);
+ }
+
+ @Override
+ public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
+ return new ArrayList<String>(swappedOut.keySet());
+ }
+
+ @Override
+ public void dropSwappedFlowFiles(String swapLocation, final FlowFileQueue flowFileQueue, String user) {
+
+ }
+
+ @Override
+ public QueueSize getSwapSize(String swapLocation) throws IOException {
+ final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+ if (flowFiles == null) {
+ return new QueueSize(0, 0L);
+ }
+
+ int count = 0;
+ long size = 0L;
+ for (final FlowFileRecord flowFile : flowFiles) {
+ count++;
+ size += flowFile.getSize();
+ }
+
+ return new QueueSize(count, size);
+ }
+
+ @Override
+ public Long getMaxRecordId(String swapLocation) throws IOException {
+ final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
+ if (flowFiles == null) {
+ return null;
+ }
+
+ Long max = null;
+ for (final FlowFileRecord flowFile : flowFiles) {
+ if (max == null || flowFile.getId() > max) {
+ max = flowFile.getId();
+ }
+ }
+
+ return max;
+ }
+
+ @Override
+ public void purge() {
+ swappedOut.clear();
+ }
+ }
+
+
+ private static class TestFlowFile implements FlowFileRecord {
+ private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+ private final long id = idGenerator.getAndIncrement();
+ private final long entryDate = System.currentTimeMillis();
+ private final Map<String, String> attributes;
+ private final long size;
+
+ public TestFlowFile() {
+ this(1L);
+ }
+
+ public TestFlowFile(final long size) {
+ this(new HashMap<String, String>(), size);
+ }
+
+ public TestFlowFile(final Map<String, String> attributes, final long size) {
+ this.attributes = attributes;
+ this.size = size;
+ }
+
+
+ @Override
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public long getEntryDate() {
+ return entryDate;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return entryDate;
+ }
+
+ @Override
+ public Long getLastQueueDate() {
+ return null;
+ }
+
+ @Override
+ public Set<String> getLineageIdentifiers() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return false;
+ }
+
+ @Override
+ public String getAttribute(String key) {
+ return attributes.get(key);
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return Collections.unmodifiableMap(attributes);
+ }
+
+ @Override
+ public int compareTo(final FlowFile o) {
+ return Long.compare(id, o.getId());
+ }
+
+ @Override
+ public long getPenaltyExpirationMillis() {
+ return 0;
+ }
+
+ @Override
+ public ContentClaim getContentClaim() {
+ return null;
+ }
+
+ @Override
+ public long getContentClaimOffset() {
+ return 0;
+ }
+ }
+
+ private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
+ @Override
+ public int compare(final FlowFile o1, final FlowFile o2) {
+ return Long.compare(o1.getSize(), o2.getSize());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index f0a6d8a..d43a3db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -32,11 +32,14 @@ import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.StandardFlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.util.NiFiProperties;
/**
@@ -66,7 +69,8 @@ public final class StandardConnection implements Connection {
destination = new AtomicReference<>(builder.destination);
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler;
- flowFileQueue = new StandardFlowFileQueue(id, this, scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
+ flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
+ scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
}
@@ -262,6 +266,9 @@ public final class StandardConnection implements Connection {
private Collection<Relationship> relationships;
private FlowFileSwapManager swapManager;
private EventReporter eventReporter;
+ private FlowFileRepository flowFileRepository;
+ private ProvenanceEventRepository provenanceRepository;
+ private ResourceClaimManager resourceClaimManager;
public Builder(final ProcessScheduler scheduler) {
this.scheduler = scheduler;
@@ -318,6 +325,21 @@ public final class StandardConnection implements Connection {
return this;
}
+ public Builder flowFileRepository(final FlowFileRepository flowFileRepository) {
+ this.flowFileRepository = flowFileRepository;
+ return this;
+ }
+
+ public Builder provenanceRepository(final ProvenanceEventRepository provenanceRepository) {
+ this.provenanceRepository = provenanceRepository;
+ return this;
+ }
+
+ public Builder resourceClaimManager(final ResourceClaimManager resourceClaimManager) {
+ this.resourceClaimManager = resourceClaimManager;
+ return this;
+ }
+
public StandardConnection build() {
if (source == null) {
throw new IllegalStateException("Cannot build a Connection without a Source");
@@ -328,6 +350,15 @@ public final class StandardConnection implements Connection {
if (swapManager == null) {
throw new IllegalStateException("Cannot build a Connection without a FlowFileSwapManager");
}
+ if (flowFileRepository == null) {
+ throw new IllegalStateException("Cannot build a Connection without a FlowFile Repository");
+ }
+ if (provenanceRepository == null) {
+ throw new IllegalStateException("Cannot build a Connection without a Provenance Repository");
+ }
+ if (resourceClaimManager == null) {
+ throw new IllegalStateException("Cannot build a Connection without a Resource Claim Manager");
+ }
if (relationships == null) {
relationships = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
index 7ab56ed..c4a86f2 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java
@@ -28,6 +28,7 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
@@ -79,7 +80,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
private EventReporter eventReporter;
private ResourceClaimManager claimManager;
-
public FileSystemSwapManager() {
final NiFiProperties properties = NiFiProperties.getInstance();
final Path flowFileRepoPath = properties.getFlowFileRepositoryPath();
@@ -111,6 +111,10 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
try (final FileOutputStream fos = new FileOutputStream(swapTempFile)) {
serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
fos.getFD().sync();
+ } catch (final IOException ioe) {
+ // we failed to write out the entire swap file. Delete the temporary file, if we can.
+ swapTempFile.delete();
+ throw ioe;
}
if (swapTempFile.renameTo(swapFile)) {
@@ -133,25 +137,6 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
warn("Swapped in FlowFiles from file " + swapFile.getAbsolutePath() + " but failed to delete the file; this file should be cleaned up manually");
}
- // TODO: When FlowFile Queue performs this operation, it needs to take the following error handling logic into account:
-
- /*
- * } catch (final EOFException eof) {
- * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile);
- *
- * if (!swapFile.delete()) {
- * warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually");
- * }
- * } catch (final FileNotFoundException fnfe) {
- * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile);
- * } catch (final Exception e) {
- * error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e);
- *
- * if (swapFile != null) {
- * queue.add(swapFile);
- * }
- * }
- */
return swappedFlowFiles;
}
@@ -165,7 +150,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
final List<FlowFileRecord> swappedFlowFiles;
try (final InputStream fis = new FileInputStream(swapFile);
final DataInputStream in = new DataInputStream(fis)) {
- swappedFlowFiles = deserializeFlowFiles(in, flowFileQueue, swapLocation, claimManager);
+ swappedFlowFiles = deserializeFlowFiles(in, swapLocation, flowFileQueue, claimManager);
}
return swappedFlowFiles;
@@ -189,6 +174,12 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
@Override
+ public void dropSwappedFlowFiles(final String swapLocation, final FlowFileQueue flowFileQueue, final String user) throws IOException {
+
+ }
+
+
+ @Override
public List<String> recoverSwapLocations(final FlowFileQueue flowFileQueue) throws IOException {
final File[] swapFiles = storageDirectory.listFiles(new FilenameFilter() {
@Override
@@ -322,7 +313,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
- public int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
+ public static int serializeFlowFiles(final List<FlowFileRecord> toSwap, final FlowFileQueue queue, final String swapLocation, final OutputStream destination) throws IOException {
if (toSwap == null || toSwap.isEmpty()) {
return 0;
}
@@ -396,8 +387,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
return toSwap.size();
}
- private void writeString(final String toWrite, final OutputStream out) throws IOException {
- final byte[] bytes = toWrite.getBytes("UTF-8");
+ private static void writeString(final String toWrite, final OutputStream out) throws IOException {
+ final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8);
final int utflen = bytes.length;
if (utflen < 65535) {
@@ -415,26 +406,29 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
}
- static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final FlowFileQueue queue, final String swapLocation, final ResourceClaimManager claimManager) throws IOException {
+ static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final String swapLocation, final FlowFileQueue queue, final ResourceClaimManager claimManager) throws IOException {
final int swapEncodingVersion = in.readInt();
if (swapEncodingVersion > SWAP_ENCODING_VERSION) {
throw new IOException("Cannot swap FlowFiles in from SwapFile because the encoding version is "
+ swapEncodingVersion + ", which is too new (expecting " + SWAP_ENCODING_VERSION + " or less)");
}
- final String connectionId = in.readUTF();
+ final String connectionId = in.readUTF(); // Connection ID
if (!connectionId.equals(queue.getIdentifier())) {
- throw new IllegalArgumentException("Cannot restore contents from FlowFile Swap File " + swapLocation +
- " because the file indicates that records belong to Connection with ID " + connectionId + " but attempted to swap those records into " + queue);
+ throw new IllegalArgumentException("Cannot deserialize FlowFiles from Swap File at location " + swapLocation +
+ " because those FlowFiles belong to Connection with ID " + connectionId + " and an attempt was made to swap them into a Connection with ID " + queue.getIdentifier());
}
final int numRecords = in.readInt();
in.readLong(); // Content Size
+ if (swapEncodingVersion > 7) {
+ in.readLong(); // Max Record ID
+ }
return deserializeFlowFiles(in, numRecords, swapEncodingVersion, false, claimManager);
}
- static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles,
+ private static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles,
final int serializationVersion, final boolean incrementContentClaims, final ResourceClaimManager claimManager) throws IOException {
final List<FlowFileRecord> flowFiles = new ArrayList<>();
for (int i = 0; i < numFlowFiles; i++) {
@@ -543,7 +537,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager {
}
final byte[] bytes = new byte[numBytes];
fillBuffer(in, bytes, numBytes);
- return new String(bytes, "UTF-8");
+ return new String(bytes, StandardCharsets.UTF_8);
}
private static Integer readFieldLength(final InputStream in) throws IOException {
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 23746ce..20f2642 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -286,7 +286,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
private final NodeProtocolSender protocolSender;
private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks");
- private final ResourceClaimManager contentClaimManager = new StandardResourceClaimManager();
+ private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
// guarded by rwLock
/**
@@ -393,7 +393,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
- final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, contentClaimManager);
+ final FlowFileRepository flowFileRepo = createFlowFileRepository(properties, resourceClaimManager);
flowFileRepository = flowFileRepo;
flowFileEventRepository = flowFileEventRepo;
counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository());
@@ -668,7 +668,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
try {
final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(implementationClassName, ContentRepository.class);
synchronized (contentRepo) {
- contentRepo.initialize(contentClaimManager);
+ contentRepo.initialize(resourceClaimManager);
}
return contentRepo;
} catch (final Exception e) {
@@ -728,11 +728,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// Create and initialize a FlowFileSwapManager for this connection
final FlowFileSwapManager swapManager = createSwapManager(properties);
final EventReporter eventReporter = createEventReporter(getBulletinRepository());
+
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
- return getResourceClaimManager();
+ return resourceClaimManager;
}
@Override
@@ -756,6 +757,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
.destination(destination)
.swapManager(swapManager)
.eventReporter(eventReporter)
+ .resourceClaimManager(resourceClaimManager)
+ .flowFileRepository(flowFileRepository)
+ .provenanceRepository(provenanceEventRepository)
.build();
}
@@ -3188,7 +3192,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalArgumentException("Input Content Claim not specified");
}
- final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
provEvent.getPreviousContentClaimIdentifier(), false);
claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
@@ -3198,7 +3202,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new IllegalArgumentException("Output Content Claim not specified");
}
- final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
provEvent.getContentClaimIdentifier(), false);
claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
@@ -3247,7 +3251,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
try {
- final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
if (!contentRepository.isAccessible(contentClaim)) {
@@ -3327,17 +3331,17 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
// Create the ContentClaim
- final ResourceClaim resourceClaim = contentClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
+ final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false);
// Increment Claimant Count, since we will now be referencing the Content Claim
- contentClaimManager.incrementClaimantCount(resourceClaim);
+ resourceClaimManager.incrementClaimantCount(resourceClaim);
final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
if (!contentRepository.isAccessible(contentClaim)) {
- contentClaimManager.decrementClaimantCount(resourceClaim);
+ resourceClaimManager.decrementClaimantCount(resourceClaim);
throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a32a485..cfbb770 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -81,9 +81,11 @@ import org.slf4j.LoggerFactory;
* <p>
* Provides a ProcessSession that ensures all accesses, changes and transfers
* occur in an atomic manner for all FlowFiles including their contents and
- * attributes</p>
+ * attributes
+ * </p>
* <p>
- * NOT THREAD SAFE</p>
+ * NOT THREAD SAFE
+ * </p>
* <p/>
*/
public final class StandardProcessSession implements ProcessSession, ProvenanceEventEnricher {
@@ -104,7 +106,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<String, Long> globalCounters = new HashMap<>();
private final Map<ContentClaim, ByteCountingOutputStream> appendableStreams = new HashMap<>();
private final ProcessContext context;
- private final Set<FlowFile> recursionSet = new HashSet<>();//set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
+ private final Set<FlowFile> recursionSet = new HashSet<>();// set used to track what is currently being operated on to prevent logic failures if recursive calls occurring
private final Set<Path> deleteOnCommit = new HashSet<>();
private final long sessionId;
private final String connectableDescription;
@@ -114,7 +116,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final StandardProvenanceReporter provenanceReporter;
- private int removedCount = 0; // number of flowfiles removed in this session
+ private int removedCount = 0; // number of flowfiles removed in this session
private long removedBytes = 0L; // size of all flowfiles removed in this session
private final LongHolder bytesRead = new LongHolder(0L);
private final LongHolder bytesWritten = new LongHolder(0L);
@@ -169,7 +171,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
this.provenanceReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(), componentType,
- context.getProvenanceRepository(), this);
+ context.getProvenanceRepository(), this);
this.sessionId = idGenerator.getAndIncrement();
this.connectableDescription = description;
@@ -196,7 +198,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// Processor-reported events.
List<ProvenanceEventRecord> autoTerminatedEvents = null;
- //validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
+ // validate that all records have a transfer relationship for them and if so determine the destination node and clone as necessary
final Map<FlowFileRecord, StandardRepositoryRecord> toAdd = new HashMap<>();
for (final StandardRepositoryRecord record : records.values()) {
if (record.isMarkedForDelete()) {
@@ -235,11 +237,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
} else {
- final Connection finalDestination = destinations.remove(destinations.size() - 1); //remove last element
+ final Connection finalDestination = destinations.remove(destinations.size() - 1); // remove last element
record.setDestination(finalDestination.getFlowFileQueue());
incrementConnectionInputCounts(finalDestination, record);
- for (final Connection destination : destinations) { //iterate over remaining destinations and "clone" as needed
+ for (final Connection destination : destinations) { // iterate over remaining destinations and "clone" as needed
incrementConnectionInputCounts(destination, record);
final FlowFileRecord currRec = record.getCurrent();
final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder().fromFlowFile(currRec);
@@ -256,7 +258,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (claim != null) {
context.getContentRepository().incrementClaimaintCount(claim);
}
- newRecord.setWorking(clone, Collections.<String, String>emptyMap());
+ newRecord.setWorking(clone, Collections.<String, String> emptyMap());
newRecord.setDestination(destination.getFlowFileQueue());
newRecord.setTransferRelationship(record.getTransferRelationship());
@@ -322,9 +324,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Connectable connectable = context.getConnectable();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
- LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
+ LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
} else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) {
- //records which have been updated - remove original if exists
+ // records which have been updated - remove original if exists
removeContent(record.getOriginalClaim());
}
}
@@ -356,7 +358,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = new HashMap<>();
for (final StandardRepositoryRecord record : checkpoint.records.values()) {
if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
- continue; //these don't need to be transferred
+ continue; // these don't need to be transferred
}
// record.getCurrent() will return null if this record was created in this session --
// in this case, we just ignore it, and it will be cleaned up by clearing the records map.
@@ -390,7 +392,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (LOG.isInfoEnabled()) {
final String sessionSummary = summarizeEvents(checkpoint);
if (!sessionSummary.isEmpty()) {
- LOG.info("{} for {}, committed the following events: {}", new Object[]{this, connectableDescription, sessionSummary});
+ LOG.info("{} for {}, committed the following events: {}", new Object[] {this, connectableDescription, sessionSummary});
}
}
@@ -611,9 +613,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
boolean creationEventRegistered = false;
if (registeredTypes != null) {
if (registeredTypes.contains(ProvenanceEventType.CREATE)
- || registeredTypes.contains(ProvenanceEventType.FORK)
- || registeredTypes.contains(ProvenanceEventType.JOIN)
- || registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
+ || registeredTypes.contains(ProvenanceEventType.FORK)
+ || registeredTypes.contains(ProvenanceEventType.JOIN)
+ || registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
creationEventRegistered = true;
}
}
@@ -747,7 +749,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
private StandardProvenanceEventRecord enrich(
- final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
+ final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) {
@@ -1039,7 +1041,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StringBuilder sb = new StringBuilder(512);
if (!LOG.isDebugEnabled() && (largestTransferSetSize > VERBOSE_LOG_THRESHOLD
- || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
+ || numModified > VERBOSE_LOG_THRESHOLD || numCreated > VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
if (numCreated > 0) {
sb.append("created ").append(numCreated).append(" FlowFiles, ");
}
@@ -1097,7 +1099,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private void formatNanos(final long nanos, final StringBuilder sb) {
final long seconds = nanos > 1000000000L ? nanos / 1000000000L : 0L;
- long millis = nanos > 1000000L ? nanos / 1000000L : 0L;;
+ long millis = nanos > 1000000L ? nanos / 1000000L : 0L;
+ ;
final long nanosLeft = nanos % 1000000L;
if (seconds > 0) {
@@ -1272,7 +1275,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
int flowFileCount = 0;
long byteCount = 0L;
for (final Connection conn : context.getPollableConnections()) {
- final QueueSize queueSize = conn.getFlowFileQueue().getActiveQueueSize();
+ final QueueSize queueSize = conn.getFlowFileQueue().size();
flowFileCount += queueSize.getObjectCount();
byteCount += queueSize.getByteCount();
}
@@ -1287,8 +1290,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
- .addAttributes(attrs)
- .build();
+ .addAttributes(attrs)
+ .build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, attrs);
records.put(fFile, record);
@@ -1324,7 +1327,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
context.getContentRepository().incrementClaimaintCount(claim);
}
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
- record.setWorking(clone, Collections.<String, String>emptyMap());
+ record.setWorking(clone, Collections.<String, String> emptyMap());
records.put(clone, record);
if (offset == 0L && size == example.getSize()) {
@@ -1637,7 +1640,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return;
}
- LOG.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, flowFiles.size()});
+ LOG.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, flowFiles.size()});
final List<RepositoryRecord> expiredRecords = new ArrayList<>(flowFiles.size());
final String processorType;
@@ -1650,7 +1653,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
final StandardProvenanceReporter expiredReporter = new StandardProvenanceReporter(this, connectable.getIdentifier(),
- processorType, context.getProvenanceRepository(), this);
+ processorType, context.getProvenanceRepository(), this);
final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
for (final FlowFileRecord flowFile : flowFiles) {
@@ -1664,7 +1667,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate();
final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
- LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
+ LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife});
}
try {
@@ -1696,7 +1699,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
record.getContentClaimOffset() + claim.getOffset(), record.getSize());
}
- enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
+ enriched.setAttributes(record.getAttributes(), Collections.<String, String> emptyMap());
return enriched.build();
}
@@ -1780,9 +1783,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
try (final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset());
- final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
- final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
- final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+ final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
+ final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
+ final ByteCountingInputStream countingStream = new ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
// We want to differentiate between IOExceptions thrown by the repository and IOExceptions thrown from
// Processor code. As a result, as have the FlowFileAccessInputStream that catches IOException from the repository
@@ -1853,7 +1856,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
try {
try (final OutputStream rawOut = contentRepo.write(newClaim);
- final OutputStream out = new BufferedOutputStream(rawOut)) {
+ final OutputStream out = new BufferedOutputStream(rawOut)) {
if (header != null && header.length > 0) {
out.write(header);
@@ -2070,10 +2073,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// the original claim if the record is "working" but the content has not been modified
// (e.g., in the case of attributes only were updated)
// In other words:
- // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
- // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because
- // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
- // because we will do that later, in the session.commit() and that would result in removing the original claim twice.
+ // If we modify the attributes of a FlowFile, and then we call record.getWorkingClaim(), this will
+ // return the same claim as record.getOriginalClaim(). So we cannot just remove the working claim because
+ // that may decrement the original claim (because the 2 claims are the same), and that's NOT what we want to do
+ // because we will do that later, in the session.commit() and that would result in removing the original claim twice.
if (contentModified) {
// In this case, it's ok to go ahead and destroy the content because we know that the working claim is going to be
// updated and the given working claim is referenced only by FlowFiles in this session (because it's the Working Claim).
@@ -2196,7 +2199,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public FlowFile importFrom(final Path source, final boolean keepSourceFile, final FlowFile destination) {
validateRecordState(destination);
- //TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
+ // TODO: find a better solution. With Windows 7 and Java 7 (very early update, at least), Files.isWritable(source.getParent()) returns false, even when it should be true.
if (!keepSourceFile && !Files.isWritable(source.getParent()) && !source.getParent().toFile().canWrite()) {
// If we do NOT want to keep the file, ensure that we can delete it, or else error.
throw new FlowFileAccessException("Cannot write to path " + source.getParent().toFile().getAbsolutePath() + " so cannot delete file; will not import.");
@@ -2228,9 +2231,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
removeTemporaryClaim(record);
final FlowFileRecord newFile = new StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
- .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
- .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
- .build();
+ .contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
+ .addAttribute(CoreAttributes.FILENAME.key(), source.toFile().getName())
+ .build();
record.setWorking(newFile, CoreAttributes.FILENAME.key(), source.toFile().getName());
if (!keepSourceFile) {
deleteOnCommit.add(source);
@@ -2370,7 +2373,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
*
* @param flowFile the FlowFile to check
* @return <code>true</code> if the FlowFile is known in this session,
- * <code>false</code> otherwise.
+ * <code>false</code> otherwise.
*/
boolean isFlowFileKnown(final FlowFile flowFile) {
return records.containsKey(flowFile);
@@ -2392,8 +2395,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final String key = entry.getKey();
final String value = entry.getValue();
if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
- || CoreAttributes.DISCARD_REASON.key().equals(key)
- || CoreAttributes.UUID.key().equals(key)) {
+ || CoreAttributes.DISCARD_REASON.key().equals(key)
+ || CoreAttributes.UUID.key().equals(key)) {
continue;
}
newAttributes.put(key, value);
@@ -2441,10 +2444,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newAttributes.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
- .addAttributes(newAttributes)
- .lineageIdentifiers(lineageIdentifiers)
- .lineageStartDate(lineageStartDate)
- .build();
+ .addAttributes(newAttributes)
+ .lineageIdentifiers(lineageIdentifiers)
+ .lineageStartDate(lineageStartDate)
+ .build();
final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
record.setWorking(fFile, newAttributes);
@@ -2465,7 +2468,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
*/
private static Map<String, String> intersectAttributes(final Collection<FlowFile> flowFileList) {
final Map<String, String> result = new HashMap<>();
- //trivial cases
+ // trivial cases
if (flowFileList == null || flowFileList.isEmpty()) {
return result;
} else if (flowFileList.size() == 1) {
@@ -2478,8 +2481,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
*/
final Map<String, String> firstMap = flowFileList.iterator().next().getAttributes();
- outer:
- for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+ outer: for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
final String key = mapEntry.getKey();
final String value = mapEntry.getValue();
for (final FlowFile flowFile : flowFileList) {
@@ -2539,7 +2541,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Set<String> removedFlowFiles = new HashSet<>();
private final Set<String> createdFlowFiles = new HashSet<>();
- private int removedCount = 0; // number of flowfiles removed in this session
+ private int removedCount = 0; // number of flowfiles removed in this session
private long removedBytes = 0L; // size of all flowfiles removed in this session
private long bytesRead = 0L;
private long bytesWritten = 0L;
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
index c4d040b..3c4fcdb 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/util/Connectables.java
@@ -26,7 +26,7 @@ public class Connectables {
public static boolean flowFilesQueued(final Connectable connectable) {
for (final Connection conn : connectable.getIncomingConnections()) {
- if (!conn.getFlowFileQueue().isActiveQueueEmpty()) {
+ if (!conn.getFlowFileQueue().isEmpty()) {
return true;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 6eeddc5..f7191c5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -22,16 +22,26 @@ import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.flowfile.FlowFile;
import org.junit.Test;
import org.mockito.Mockito;
@@ -47,7 +57,7 @@ public class TestFileSystemSwapManager {
final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
- final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, flowFileQueue, "/src/test/resources/old-swap-file.swap", new NopResourceClaimManager());
+ final List<FlowFileRecord> records = FileSystemSwapManager.deserializeFlowFiles(in, "/src/test/resources/old-swap-file.swap", flowFileQueue, new NopResourceClaimManager());
assertEquals(10000, records.size());
for (final FlowFileRecord record : records) {
@@ -57,6 +67,53 @@ public class TestFileSystemSwapManager {
}
}
+ @Test
+ public void testRoundTripSerializeDeserialize() throws IOException {
+ final List<FlowFileRecord> toSwap = new ArrayList<>(10000);
+ final Map<String, String> attrs = new HashMap<>();
+ for (int i = 0; i < 10000; i++) {
+ attrs.put("i", String.valueOf(i));
+ final FlowFileRecord ff = new TestFlowFile(attrs, i);
+ toSwap.add(ff);
+ }
+
+ final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class);
+ Mockito.when(flowFileQueue.getIdentifier()).thenReturn("87bb99fe-412c-49f6-a441-d1b0af4e20b4");
+
+ final String swapLocation = "target/testRoundTrip.swap";
+ final File swapFile = new File(swapLocation);
+ Files.deleteIfExists(swapFile.toPath());
+
+ try (final FileOutputStream fos = new FileOutputStream(swapFile)) {
+ FileSystemSwapManager.serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos);
+ }
+
+ final List<FlowFileRecord> swappedIn;
+ try (final FileInputStream fis = new FileInputStream(swapFile);
+ final DataInputStream dis = new DataInputStream(fis)) {
+ swappedIn = FileSystemSwapManager.deserializeFlowFiles(dis, swapLocation, flowFileQueue, Mockito.mock(ResourceClaimManager.class));
+ }
+
+ assertEquals(toSwap.size(), swappedIn.size());
+ for (int i = 0; i < toSwap.size(); i++) {
+ final FlowFileRecord pre = toSwap.get(i);
+ final FlowFileRecord post = swappedIn.get(i);
+
+ assertEquals(pre.getSize(), post.getSize());
+ assertEquals(pre.getAttributes(), post.getAttributes());
+ assertEquals(pre.getSize(), post.getSize());
+ assertEquals(pre.getId(), post.getId());
+ assertEquals(pre.getContentClaim(), post.getContentClaim());
+ assertEquals(pre.getContentClaimOffset(), post.getContentClaimOffset());
+ assertEquals(pre.getEntryDate(), post.getEntryDate());
+ assertEquals(pre.getLastQueueDate(), post.getLastQueueDate());
+ assertEquals(pre.getLineageIdentifiers(), post.getLineageIdentifiers());
+ assertEquals(pre.getLineageStartDate(), post.getLineageStartDate());
+ assertEquals(pre.getPenaltyExpirationMillis(), post.getPenaltyExpirationMillis());
+ }
+ }
+
+
public class NopResourceClaimManager implements ResourceClaimManager {
@Override
@@ -100,4 +157,87 @@ public class TestFileSystemSwapManager {
public void purge() {
}
}
+
+
+ private static class TestFlowFile implements FlowFileRecord {
+ private static final AtomicLong idGenerator = new AtomicLong(0L);
+
+ private final long id = idGenerator.getAndIncrement();
+ private final long entryDate = System.currentTimeMillis();
+ private final long lastQueueDate = System.currentTimeMillis();
+ private final Map<String, String> attributes;
+ private final long size;
+
+
+ public TestFlowFile(final Map<String, String> attributes, final long size) {
+ this.attributes = attributes;
+ this.size = size;
+ }
+
+
+ @Override
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public long getEntryDate() {
+ return entryDate;
+ }
+
+ @Override
+ public long getLineageStartDate() {
+ return entryDate;
+ }
+
+ @Override
+ public Long getLastQueueDate() {
+ return lastQueueDate;
+ }
+
+ @Override
+ public Set<String> getLineageIdentifiers() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public boolean isPenalized() {
+ return false;
+ }
+
+ @Override
+ public String getAttribute(String key) {
+ return attributes.get(key);
+ }
+
+ @Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
+ public Map<String, String> getAttributes() {
+ return Collections.unmodifiableMap(attributes);
+ }
+
+ @Override
+ public int compareTo(final FlowFile o) {
+ return Long.compare(id, o.getId());
+ }
+
+ @Override
+ public long getPenaltyExpirationMillis() {
+ return -1L;
+ }
+
+ @Override
+ public ContentClaim getContentClaim() {
+ return null;
+ }
+
+ @Override
+ public long getContentClaimOffset() {
+ return 0;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 12f8e5e..1783708 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -134,7 +134,7 @@ public class TestStandardProcessSession {
final ProcessScheduler processScheduler = Mockito.mock(ProcessScheduler.class);
final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class);
- flowFileQueue = new StandardFlowFileQueue("1", connection, processScheduler, swapManager, null, 10000);
+ flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000);
when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
Mockito.doAnswer(new Answer<Object>() {
http://git-wip-us.apache.org/repos/asf/nifi/blob/49a781df/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index 8bf5553..0e3bcac 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -37,6 +37,11 @@ import java.util.concurrent.TimeUnit;
import javax.ws.rs.WebApplicationException;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.ClassUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.admin.service.UserService;
+import org.apache.nifi.authorization.DownloadAuthorization;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
@@ -47,9 +52,10 @@ import org.apache.nifi.controller.ContentAvailability;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.FlowFileQueue;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.ContentNotFoundException;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -61,8 +67,8 @@ import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.QueueSize;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -75,7 +81,9 @@ import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.nifi.provenance.search.SearchTerms;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.search.SearchContext;
@@ -85,6 +93,7 @@ import org.apache.nifi.services.FlowService;
import org.apache.nifi.user.NiFiUser;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.web.DownloadableContent;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
@@ -104,15 +113,6 @@ import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.ProcessGroupStatusDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
-import org.apache.nifi.web.DownloadableContent;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.admin.service.UserService;
-import org.apache.nifi.authorization.DownloadAuthorization;
-import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.reporting.BulletinQuery;
-import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -501,7 +501,7 @@ public class ControllerFacade {
* Site-to-Site communications
*
* @return the socket port that the Cluster Manager is listening on for
- * Site-to-Site communications
+ * Site-to-Site communications
*/
public Integer getClusterManagerRemoteSiteListeningPort() {
return flowController.getClusterManagerRemoteSiteListeningPort();
@@ -512,7 +512,7 @@ public class ControllerFacade {
* Manager are secure
*
* @return whether or not Site-to-Site communications with the Cluster
- * Manager are secure
+ * Manager are secure
*/
public Boolean isClusterManagerRemoteSiteCommsSecure() {
return flowController.isClusterManagerRemoteSiteCommsSecure();
@@ -523,7 +523,7 @@ public class ControllerFacade {
* Site-to-Site communications
*
* @return the socket port that the local instance is listening on for
- * Site-to-Site communications
+ * Site-to-Site communications
*/
public Integer getRemoteSiteListeningPort() {
return flowController.getRemoteSiteListeningPort();
@@ -534,7 +534,7 @@ public class ControllerFacade {
* instance are secure
*
* @return whether or not Site-to-Site communications with the local
- * instance are secure
+ * instance are secure
*/
public Boolean isRemoteSiteCommsSecure() {
return flowController.isRemoteSiteCommsSecure();