You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2018/10/04 20:29:00 UTC
[04/14] nifi git commit: NIFI-5516: Implement Load-Balanced
Connections Refactoring StandardFlowFileQueue to have an
AbstractFlowFileQueue Refactored more into AbstractFlowFileQueue Added
documentation, cleaned up code some Refactored FlowFileQueue so th
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
index 303ca7b..b454d2d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestStandardFlowFileQueue.java
@@ -17,47 +17,20 @@
package org.apache.nifi.controller;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
-import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.ListFlowFileState;
import org.apache.nifi.controller.queue.ListFlowFileStatus;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
-import org.apache.nifi.controller.repository.FlowFileSwapManager;
-import org.apache.nifi.controller.repository.IncompleteSwapFileException;
-import org.apache.nifi.controller.repository.SwapContents;
-import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
-import org.apache.nifi.controller.repository.SwapSummary;
-import org.apache.nifi.controller.repository.claim.ContentClaim;
-import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.swap.StandardSwapContents;
-import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
@@ -71,8 +44,22 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
public class TestStandardFlowFileQueue {
- private TestSwapManager swapManager = null;
+ private MockSwapManager swapManager = null;
private StandardFlowFileQueue queue = null;
private Connection connection = null;
@@ -98,7 +85,7 @@ public class TestStandardFlowFileQueue {
Mockito.when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
scheduler = Mockito.mock(ProcessScheduler.class);
- swapManager = new TestSwapManager();
+ swapManager = new MockSwapManager();
flowFileRepo = Mockito.mock(FlowFileRepository.class);
provRepo = Mockito.mock(ProvenanceEventRepository.class);
@@ -116,8 +103,8 @@ public class TestStandardFlowFileQueue {
}
}).when(provRepo).registerEvents(Mockito.any(Iterable.class));
- queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, 0L, "0 B");
- TestFlowFile.idGenerator.set(0L);
+ queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 10000, 0L, "0 B");
+ MockFlowFileRecord.resetIdGenerator();
}
@Test
@@ -125,7 +112,7 @@ public class TestStandardFlowFileQueue {
queue.setFlowFileExpiration("1 ms");
for (int i = 0; i < 100; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
}
// just make sure that the flowfiles have time to expire.
@@ -140,11 +127,11 @@ public class TestStandardFlowFileQueue {
assertNull(pulled);
assertEquals(100, expiredRecords.size());
- final QueueSize activeSize = queue.getActiveQueueSize();
+ final QueueSize activeSize = queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize();
assertEquals(0, activeSize.getObjectCount());
assertEquals(0L, activeSize.getByteCount());
- final QueueSize unackSize = queue.getUnacknowledgedQueueSize();
+ final QueueSize unackSize = queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize();
assertEquals(0, unackSize.getObjectCount());
assertEquals(0L, unackSize.getByteCount());
}
@@ -158,13 +145,13 @@ public class TestStandardFlowFileQueue {
assertFalse(queue.isFull());
for (int i = 0; i < 9; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertFalse(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
}
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertTrue(queue.isFull());
assertFalse(queue.isEmpty());
assertFalse(queue.isActiveQueueEmpty());
@@ -193,11 +180,11 @@ public class TestStandardFlowFileQueue {
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertFalse(queue.isFull());
}
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertTrue(queue.isFull());
Thread.sleep(100L);
@@ -226,11 +213,11 @@ public class TestStandardFlowFileQueue {
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertFalse(queue.isFull());
}
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertTrue(queue.isFull());
Thread.sleep(100L);
@@ -259,11 +246,11 @@ public class TestStandardFlowFileQueue {
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertFalse(queue.isFull());
}
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertTrue(queue.isFull());
Thread.sleep(100L);
@@ -284,11 +271,11 @@ public class TestStandardFlowFileQueue {
queue.setFlowFileExpiration("10 millis");
for (int i = 0; i < 9; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertFalse(queue.isFull());
}
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertTrue(queue.isFull());
Thread.sleep(100L);
@@ -306,25 +293,25 @@ public class TestStandardFlowFileQueue {
@Test
public void testSwapOutOccurs() {
for (int i = 0; i < 10000; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertEquals(0, swapManager.swapOutCalledCount);
assertEquals(i + 1, queue.size().getObjectCount());
assertEquals(i + 1, queue.size().getByteCount());
}
for (int i = 0; i < 9999; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertEquals(0, swapManager.swapOutCalledCount);
assertEquals(i + 10001, queue.size().getObjectCount());
assertEquals(i + 10001, queue.size().getByteCount());
}
- queue.put(new TestFlowFile(1000));
+ queue.put(new MockFlowFileRecord(1000));
assertEquals(1, swapManager.swapOutCalledCount);
assertEquals(20000, queue.size().getObjectCount());
assertEquals(20999, queue.size().getByteCount());
- assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
}
@Test
@@ -335,13 +322,13 @@ public class TestStandardFlowFileQueue {
long maxSize = 20000;
for (int i = 1; i <= 20000; i++) {
- queue.put(new TestFlowFile(maxSize - i));
+ queue.put(new MockFlowFileRecord(maxSize - i));
}
assertEquals(1, swapManager.swapOutCalledCount);
assertEquals(20000, queue.size().getObjectCount());
- assertEquals(10000, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(10000, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>());
assertEquals(10000, flowFiles.size());
for (int i = 0; i < 10000; i++) {
@@ -352,37 +339,37 @@ public class TestStandardFlowFileQueue {
@Test
public void testSwapIn() {
for (int i = 1; i <= 20000; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
}
assertEquals(1, swapManager.swappedOut.size());
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertEquals(1, swapManager.swappedOut.size());
final Set<FlowFileRecord> exp = new HashSet<>();
for (int i = 0; i < 9999; i++) {
final FlowFileRecord flowFile = queue.poll(exp);
assertNotNull(flowFile);
- assertEquals(1, queue.getUnacknowledgedQueueSize().getObjectCount());
- assertEquals(1, queue.getUnacknowledgedQueueSize().getByteCount());
+ assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+ assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount());
queue.acknowledge(Collections.singleton(flowFile));
- assertEquals(0, queue.getUnacknowledgedQueueSize().getObjectCount());
- assertEquals(0, queue.getUnacknowledgedQueueSize().getByteCount());
+ assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+ assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount());
}
assertEquals(0, swapManager.swapInCalledCount);
- assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
assertNotNull(queue.poll(exp));
assertEquals(0, swapManager.swapInCalledCount);
- assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
assertEquals(1, swapManager.swapOutCalledCount);
assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
assertEquals(1, swapManager.swapInCalledCount);
- assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(9999, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
assertTrue(swapManager.swappedOut.isEmpty());
@@ -392,14 +379,14 @@ public class TestStandardFlowFileQueue {
@Test
public void testSwapInWhenThresholdIsLessThanSwapSize() {
// create a queue where the swap threshold is less than 10k
- queue = new StandardFlowFileQueue("id", connection, flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, 0L, "0 B");
+ queue = new StandardFlowFileQueue("id", new NopConnectionEventListener(), flowFileRepo, provRepo, claimManager, scheduler, swapManager, null, 1000, 0L, "0 B");
for (int i = 1; i <= 20000; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
}
assertEquals(1, swapManager.swappedOut.size());
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
assertEquals(1, swapManager.swappedOut.size());
final Set<FlowFileRecord> exp = new HashSet<>();
@@ -412,26 +399,26 @@ public class TestStandardFlowFileQueue {
for (int i = 0; i < 999; i++) { //
final FlowFileRecord flowFile = queue.poll(exp);
assertNotNull(flowFile);
- assertEquals(1, queue.getUnacknowledgedQueueSize().getObjectCount());
- assertEquals(1, queue.getUnacknowledgedQueueSize().getByteCount());
+ assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+ assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount());
queue.acknowledge(Collections.singleton(flowFile));
- assertEquals(0, queue.getUnacknowledgedQueueSize().getObjectCount());
- assertEquals(0, queue.getUnacknowledgedQueueSize().getByteCount());
+ assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+ assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getUnacknowledgedQueueSize().getByteCount());
}
assertEquals(0, swapManager.swapInCalledCount);
- assertEquals(1, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(1, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
assertNotNull(queue.poll(exp));
assertEquals(0, swapManager.swapInCalledCount);
- assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
assertEquals(1, swapManager.swapOutCalledCount);
assertNotNull(queue.poll(exp)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top.
assertEquals(1, swapManager.swapInCalledCount);
- assertEquals(9999, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(9999, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
assertTrue(swapManager.swappedOut.isEmpty());
@@ -441,7 +428,7 @@ public class TestStandardFlowFileQueue {
@Test
public void testQueueCountsUpdatedWhenIncompleteSwapFile() {
for (int i = 1; i <= 20000; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
}
assertEquals(20000, queue.size().getObjectCount());
@@ -502,7 +489,7 @@ public class TestStandardFlowFileQueue {
@Test(timeout = 120000)
public void testDropSwappedFlowFiles() {
for (int i = 1; i <= 30000; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
}
assertEquals(2, swapManager.swappedOut.size());
@@ -524,7 +511,7 @@ public class TestStandardFlowFileQueue {
@Test(timeout = 5000)
public void testListFlowFilesOnlyActiveQueue() throws InterruptedException {
for (int i = 0; i < 9999; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
}
final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 10000);
@@ -544,7 +531,7 @@ public class TestStandardFlowFileQueue {
@Test(timeout = 5000)
public void testListFlowFilesResultsLimited() throws InterruptedException {
for (int i = 0; i < 30050; i++) {
- queue.put(new TestFlowFile());
+ queue.put(new MockFlowFileRecord());
}
final ListFlowFileStatus status = queue.listFlowFiles(UUID.randomUUID().toString(), 100);
@@ -565,7 +552,7 @@ public class TestStandardFlowFileQueue {
Collection<FlowFileRecord> tff = new ArrayList<>();
//Swap Size is 10000 records, so 30000 is equal to 3 swap files.
for (int i = 0; i < 30000; i++) {
- tff.add(new TestFlowFile());
+ tff.add(new MockFlowFileRecord());
}
queue.putAll(tff);
@@ -588,7 +575,7 @@ public class TestStandardFlowFileQueue {
public void testOOMEFollowedBySuccessfulSwapIn() {
final List<FlowFileRecord> flowFiles = new ArrayList<>();
for (int i = 0; i < 50000; i++) {
- flowFiles.add(new TestFlowFile());
+ flowFiles.add(new MockFlowFileRecord());
}
queue.putAll(flowFiles);
@@ -633,224 +620,13 @@ public class TestStandardFlowFileQueue {
queue.acknowledge(flowFiles);
assertNull(queue.poll(expiredRecords));
- assertEquals(0, queue.getActiveQueueSize().getObjectCount());
+ assertEquals(0, queue.getQueueDiagnostics().getLocalQueuePartitionDiagnostics().getActiveQueueSize().getObjectCount());
assertEquals(0, queue.size().getObjectCount());
assertTrue(swapManager.swappedOut.isEmpty());
}
- private class TestSwapManager implements FlowFileSwapManager {
- private final Map<String, List<FlowFileRecord>> swappedOut = new HashMap<>();
- int swapOutCalledCount = 0;
- int swapInCalledCount = 0;
-
- private int incompleteSwapFileRecordsToInclude = -1;
-
- private int failSwapInAfterN = -1;
- private Throwable failSwapInFailure = null;
-
- private void setSwapInFailure(final Throwable t) {
- this.failSwapInFailure = t;
- }
-
- @Override
- public void initialize(final SwapManagerInitializationContext initializationContext) {
-
- }
-
- public void enableIncompleteSwapFileException(final int flowFilesToInclude) {
- incompleteSwapFileRecordsToInclude = flowFilesToInclude;
- }
-
- @Override
- public String swapOut(List<FlowFileRecord> flowFiles, FlowFileQueue flowFileQueue) throws IOException {
- swapOutCalledCount++;
- final String location = UUID.randomUUID().toString();
- swappedOut.put(location, new ArrayList<>(flowFiles));
- return location;
- }
-
- private void throwIncompleteIfNecessary(final String swapLocation, final boolean remove) throws IOException {
- if (incompleteSwapFileRecordsToInclude > -1) {
- final SwapSummary summary = getSwapSummary(swapLocation);
-
- final List<FlowFileRecord> records;
- if (remove) {
- records = swappedOut.remove(swapLocation);
- } else {
- records = swappedOut.get(swapLocation);
- }
-
- final List<FlowFileRecord> partial = records.subList(0, incompleteSwapFileRecordsToInclude);
- final SwapContents partialContents = new StandardSwapContents(summary, partial);
- throw new IncompleteSwapFileException(swapLocation, partialContents);
- }
-
- if (swapInCalledCount > failSwapInAfterN && failSwapInAfterN > -1) {
- if (failSwapInFailure instanceof RuntimeException) {
- throw (RuntimeException) failSwapInFailure;
- }
- if (failSwapInFailure instanceof Error) {
- throw (Error) failSwapInFailure;
- }
-
- throw new RuntimeException(failSwapInFailure);
- }
- }
-
- @Override
- public SwapContents peek(String swapLocation, final FlowFileQueue flowFileQueue) throws IOException {
- throwIncompleteIfNecessary(swapLocation, false);
- return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.get(swapLocation));
- }
-
- @Override
- public SwapContents swapIn(String swapLocation, FlowFileQueue flowFileQueue) throws IOException {
- swapInCalledCount++;
- throwIncompleteIfNecessary(swapLocation, true);
- return new StandardSwapContents(getSwapSummary(swapLocation), swappedOut.remove(swapLocation));
- }
-
- @Override
- public List<String> recoverSwapLocations(FlowFileQueue flowFileQueue) throws IOException {
- return new ArrayList<>(swappedOut.keySet());
- }
-
- @Override
- public SwapSummary getSwapSummary(String swapLocation) throws IOException {
- final List<FlowFileRecord> flowFiles = swappedOut.get(swapLocation);
- if (flowFiles == null) {
- return StandardSwapSummary.EMPTY_SUMMARY;
- }
-
- int count = 0;
- long size = 0L;
- Long max = null;
- final List<ResourceClaim> resourceClaims = new ArrayList<>();
- for (final FlowFileRecord flowFile : flowFiles) {
- count++;
- size += flowFile.getSize();
- if (max == null || flowFile.getId() > max) {
- max = flowFile.getId();
- }
-
- if (flowFile.getContentClaim() != null) {
- resourceClaims.add(flowFile.getContentClaim().getResourceClaim());
- }
- }
-
- return new StandardSwapSummary(new QueueSize(count, size), max, resourceClaims);
- }
-
- @Override
- public void purge() {
- swappedOut.clear();
- }
- }
-
-
- private static class TestFlowFile implements FlowFileRecord {
- private static final AtomicLong idGenerator = new AtomicLong(0L);
-
- private final long id = idGenerator.getAndIncrement();
- private final long entryDate = System.currentTimeMillis();
- private final Map<String, String> attributes;
- private final long size;
-
- public TestFlowFile() {
- this(1L);
- }
-
- public TestFlowFile(final long size) {
- this(new HashMap<>(), size);
- }
-
- public TestFlowFile(final Map<String, String> attributes, final long size) {
- this.attributes = attributes;
- this.size = size;
-
- if (!attributes.containsKey(CoreAttributes.UUID.key())) {
- attributes.put(CoreAttributes.UUID.key(), createFakeUUID());
- }
- }
-
- private String createFakeUUID(){
- final String s=Long.toHexString(id);
- return new StringBuffer("00000000-0000-0000-0000000000000000".substring(0,(35-s.length()))+s).insert(23, '-').toString();
- }
-
- @Override
- public long getId() {
- return id;
- }
-
- @Override
- public long getEntryDate() {
- return entryDate;
- }
-
- @Override
- public long getLineageStartDate() {
- return entryDate;
- }
-
- @Override
- public Long getLastQueueDate() {
- return null;
- }
-
- @Override
- public boolean isPenalized() {
- return false;
- }
-
- @Override
- public String getAttribute(String key) {
- return attributes.get(key);
- }
-
- @Override
- public long getSize() {
- return size;
- }
-
- @Override
- public Map<String, String> getAttributes() {
- return Collections.unmodifiableMap(attributes);
- }
-
- @Override
- public int compareTo(final FlowFile o) {
- return Long.compare(id, o.getId());
- }
-
- @Override
- public long getPenaltyExpirationMillis() {
- return 0;
- }
-
- @Override
- public ContentClaim getContentClaim() {
- return null;
- }
-
- @Override
- public long getContentClaimOffset() {
- return 0;
- }
-
- @Override
- public long getLineageStartIndex() {
- return 0;
- }
-
- @Override
- public long getQueueDateIndex() {
- return 0;
- }
- }
-
private static class FlowFileSizePrioritizer implements FlowFilePrioritizer {
@Override
public int compare(final FlowFile o1, final FlowFile o2) {