You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:05 UTC
[5/7] nifi git commit: NIFI-2854: Refactor repositories and swap
files to use schema-based serialization so that nifi can be rolled back to a
previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
new file mode 100644
index 0000000..916fd76
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap;
+import org.apache.nifi.controller.repository.schema.ContentClaimSchema;
+import org.apache.nifi.controller.repository.schema.FlowFileSchema;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema;
+import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.Repetition;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+import org.apache.nifi.repository.schema.SchemaRecordWriter;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
+ private static final int MAX_ENCODING_VERSION = 1;
+
+ private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1;
+ private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1;
+
+ private final ResourceClaimManager resourceClaimManager;
+ private volatile RecordSchema recoverySchema;
+
+ public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) {
+ this.resourceClaimManager = resourceClaimManager;
+ }
+
+ @Override
+ public void writeHeader(final DataOutputStream out) throws IOException {
+ writeSchema.writeTo(out);
+ }
+
+ @Override
+ public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord newRecordState, final DataOutputStream out) throws IOException {
+ serializeRecord(newRecordState, out);
+ }
+
+ @Override
+ public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
+ final RecordSchema schema;
+ switch (record.getType()) {
+ case CREATE:
+ case UPDATE:
+ schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1;
+ break;
+ case CONTENTMISSING:
+ case DELETE:
+ schema = RepositoryRecordSchema.DELETE_SCHEMA_V1;
+ break;
+ case SWAP_IN:
+ schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V1;
+ break;
+ case SWAP_OUT:
+ schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V1;
+ break;
+ default:
+ throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen.
+ }
+
+ final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema);
+ final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1);
+
+ new SchemaRecordWriter().writeRecord(update, out);
+ }
+
+ @Override
+ public void readHeader(final DataInputStream in) throws IOException {
+ recoverySchema = RecordSchema.readFrom(in);
+ }
+
+ @Override
+ public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
+ return deserializeRecord(in, version);
+ }
+
+ @Override
+ public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
+ final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema);
+ final Record updateRecord = reader.readRecord(in);
+
+ // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the
+ // top level that indicates which type of record we have.
+ final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V1);
+
+ final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD);
+ final UpdateType updateType = UpdateType.valueOf(actionType);
+ switch (updateType) {
+ case CREATE:
+ return createRecord(record);
+ case DELETE:
+ return deleteRecord(record);
+ case SWAP_IN:
+ return swapInRecord(record);
+ case SWAP_OUT:
+ return swapOutRecord(record);
+ case UPDATE:
+ return updateRecord(record);
+ default:
+ throw new IOException("Found unrecognized Update Type '" + actionType + "'");
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private StandardRepositoryRecord createRecord(final Record record) {
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+ ffBuilder.id((Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID));
+ ffBuilder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE));
+
+ final Long lastQueueDate = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE);
+ final Long queueDateIndex = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX);
+ ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+
+ final Long lineageStartDate = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE);
+ final Long lineageStartIndex = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX);
+ ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+ populateContentClaim(ffBuilder, record);
+ ffBuilder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE));
+
+ ffBuilder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES));
+
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+ final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
+ final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+ return new StandardRepositoryRecord(queue, flowFileRecord);
+ }
+
+ private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) {
+ final Object claimMap = record.getFieldValue(FlowFileSchema.CONTENT_CLAIM);
+ if (claimMap == null) {
+ return;
+ }
+
+ final Record claimRecord = (Record) claimMap;
+ final ContentClaim contentClaim = ContentClaimFieldMap.getContentClaim(claimRecord, resourceClaimManager);
+ final Long offset = ContentClaimFieldMap.getContentClaimOffset(claimRecord);
+
+ ffBuilder.contentClaim(contentClaim);
+ ffBuilder.contentClaimOffset(offset);
+ }
+
+ private RepositoryRecord updateRecord(final Record record) {
+ return createRecord(record);
+ }
+
+ private RepositoryRecord deleteRecord(final Record record) {
+ final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+
+ final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+ repoRecord.markForDelete();
+ return repoRecord;
+ }
+
+ private RepositoryRecord swapInRecord(final Record record) {
+ final StandardRepositoryRecord repoRecord = createRecord(record);
+ final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
+ repoRecord.setSwapLocation(swapLocation);
+ return repoRecord;
+ }
+
+ private RepositoryRecord swapOutRecord(final Record record) {
+ final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD);
+ final String queueId = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE));
+ final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE));
+ final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(recordId)
+ .build();
+
+ return new StandardRepositoryRecord(queue, flowFileRecord, swapLocation);
+ }
+
+ @Override
+ public int getVersion() {
+ return MAX_ENCODING_VERSION;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index a696e79..10f0d8c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -49,8 +49,6 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
-import org.apache.nifi.controller.repository.io.ByteCountingInputStream;
-import org.apache.nifi.controller.repository.io.ByteCountingOutputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
import org.apache.nifi.controller.repository.io.FlowFileAccessInputStream;
@@ -75,6 +73,8 @@ import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -121,8 +121,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private int removedCount = 0; // number of flowfiles removed in this session
private long removedBytes = 0L; // size of all flowfiles removed in this session
- private final AtomicLong bytesRead = new AtomicLong(0L);
- private final AtomicLong bytesWritten = new AtomicLong(0L);
+ private long bytesRead = 0L;
+ private long bytesWritten = 0L;
private int flowFilesIn = 0, flowFilesOut = 0;
private long contentSizeIn = 0L, contentSizeOut = 0L;
@@ -975,8 +975,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Connectable connectable = context.getConnectable();
final StandardFlowFileEvent flowFileEvent = new StandardFlowFileEvent(connectable.getIdentifier());
- flowFileEvent.setBytesRead(bytesRead.get());
- flowFileEvent.setBytesWritten(bytesWritten.get());
+ flowFileEvent.setBytesRead(bytesRead);
+ flowFileEvent.setBytesWritten(bytesWritten);
// update event repository
try {
@@ -1064,8 +1064,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
flowFilesOut = 0;
removedCount = 0;
removedBytes = 0L;
- bytesRead.set(0L);
- bytesWritten.set(0L);
+ bytesRead = 0L;
+ bytesWritten = 0L;
connectionCounts.clear();
createdFlowFiles.clear();
removedFlowFiles.clear();
@@ -2006,8 +2006,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// callback for reading FlowFile 1 and if we used the same stream we'd be destroying the ability to read from FlowFile 1.
if (allowCachingOfStream && recursionSet.isEmpty()) {
if (currentReadClaim == claim) {
- if (currentReadClaimStream != null && currentReadClaimStream.getStreamLocation() <= offset) {
- final long bytesToSkip = offset - currentReadClaimStream.getStreamLocation();
+ if (currentReadClaimStream != null && currentReadClaimStream.getBytesConsumed() <= offset) {
+ final long bytesToSkip = offset - currentReadClaimStream.getBytesConsumed();
if (bytesToSkip > 0) {
StreamUtils.skip(currentReadClaimStream, bytesToSkip);
}
@@ -2023,7 +2023,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
currentReadClaim = claim;
- currentReadClaimStream = new ByteCountingInputStream(rawInStream, new AtomicLong(0L));
+ currentReadClaimStream = new ByteCountingInputStream(rawInStream);
StreamUtils.skip(currentReadClaimStream, offset);
// Use a non-closeable stream because we want to keep it open after the callback has finished so that we can
@@ -2270,8 +2270,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
writtenCount += footer.length;
}
} finally {
- bytesWritten.getAndAdd(writtenCount);
- bytesRead.getAndAdd(readCount);
+ bytesWritten += writtenCount;
+ bytesRead += readCount;
}
} catch (final ContentNotFoundException nfe) {
destroyContent(newClaim);
@@ -2311,8 +2311,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
validateRecordState(source);
final StandardRepositoryRecord record = records.get(source);
+ long writtenToFlowFile = 0L;
ContentClaim newClaim = null;
- final AtomicLong writtenHolder = new AtomicLong(0L);
try {
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
@@ -2320,9 +2320,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(newClaim);
try (final OutputStream stream = context.getContentRepository().write(newClaim);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
- final OutputStream countingOut = new ByteCountingOutputStream(disableOnClose, writtenHolder)) {
- recursionSet.add(source);
- writer.process(new FlowFileAccessOutputStream(countingOut, source));
+ final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) {
+ try {
+ recursionSet.add(source);
+ writer.process(new FlowFileAccessOutputStream(countingOut, source));
+ } finally {
+ writtenToFlowFile = countingOut.getBytesWritten();
+ bytesWritten += countingOut.getBytesWritten();
+ }
} finally {
recursionSet.remove(source);
}
@@ -2342,8 +2347,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
resetWriteClaims(); // need to reset write claim before we can remove the claim
destroyContent(newClaim);
throw t;
- } finally {
- bytesWritten.getAndAdd(writtenHolder.get());
}
removeTemporaryClaim(record);
@@ -2351,7 +2354,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
.fromFlowFile(record.getCurrent())
.contentClaim(newClaim)
.contentClaimOffset(0)
- .size(writtenHolder.get())
+ .size(writtenToFlowFile)
.build();
record.setWorking(newFile);
@@ -2379,7 +2382,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final OutputStream rawOutStream = context.getContentRepository().write(newClaim);
final OutputStream bufferedOutStream = new BufferedOutputStream(rawOutStream);
- outStream = new ByteCountingOutputStream(bufferedOutStream, new AtomicLong(0L));
+ outStream = new ByteCountingOutputStream(bufferedOutStream);
originalByteWrittenCount = 0;
appendableStreams.put(newClaim, outStream);
@@ -2448,7 +2451,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
} finally {
if (outStream != null) {
final long bytesWrittenThisIteration = outStream.getBytesWritten() - originalByteWrittenCount;
- bytesWritten.getAndAdd(bytesWrittenThisIteration);
+ bytesWritten += bytesWrittenThisIteration;
}
}
@@ -2542,8 +2545,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final StandardRepositoryRecord record = records.get(source);
final ContentClaim currClaim = record.getCurrentClaim();
+ long writtenToFlowFile = 0L;
ContentClaim newClaim = null;
- final AtomicLong writtenHolder = new AtomicLong(0L);
try {
newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant());
claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
@@ -2556,7 +2559,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final InputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream os = context.getContentRepository().write(newClaim);
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
- final OutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
+ final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) {
recursionSet.add(source);
@@ -2574,6 +2577,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
cnfeThrown = true;
throw cnfe;
} finally {
+ writtenToFlowFile = countingOut.getBytesWritten();
recursionSet.remove(source);
// if cnfeThrown is true, we don't need to re-thrown the Exception; it will propagate.
@@ -2595,7 +2599,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
destroyContent(newClaim);
throw t;
} finally {
- bytesWritten.getAndAdd(writtenHolder.get());
+ bytesWritten += writtenToFlowFile;
}
removeTemporaryClaim(record);
@@ -2603,7 +2607,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
.fromFlowFile(record.getCurrent())
.contentClaim(newClaim)
.contentClaimOffset(0L)
- .size(writtenHolder.get())
+ .size(writtenToFlowFile)
.build();
record.setWorking(newFile);
@@ -2635,8 +2639,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
long newSize = 0L;
try {
newSize = context.getContentRepository().importFrom(source, newClaim);
- bytesWritten.getAndAdd(newSize);
- bytesRead.getAndAdd(newSize);
+ bytesWritten += newSize;
+ bytesRead += newSize;
} catch (final Throwable t) {
destroyContent(newClaim);
throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t);
@@ -2671,7 +2675,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination);
newSize = context.getContentRepository().importFrom(source, newClaim);
- bytesWritten.getAndAdd(newSize);
+ bytesWritten += newSize;
} catch (final IOException e) {
throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e);
}
@@ -2697,8 +2701,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
ensureNotAppending(record.getCurrentClaim());
final long copyCount = context.getContentRepository().exportTo(record.getCurrentClaim(), destination, append, record.getCurrentClaimOffset(), source.getSize());
- bytesRead.getAndAdd(copyCount);
- bytesWritten.getAndAdd(copyCount);
+ bytesRead += copyCount;
+ bytesWritten += copyCount;
} catch (final ContentNotFoundException nfe) {
handleContentNotFound(nfe, record);
} catch (final Throwable t) {
@@ -3016,8 +3020,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.removedCount += session.removedCount;
this.removedBytes += session.removedBytes;
- this.bytesRead += session.bytesRead.get();
- this.bytesWritten += session.bytesWritten.get();
+ this.bytesRead += session.bytesRead;
+ this.bytesWritten += session.bytesWritten;
this.flowFilesIn += session.flowFilesIn;
this.flowFilesOut += session.flowFilesOut;
this.contentSizeIn += session.contentSizeIn;
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 9c2a7d8..2a323de 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -16,12 +16,7 @@
*/
package org.apache.nifi.controller.repository;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.EOFException;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -45,16 +40,12 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
-import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
-import org.wali.SerDe;
import org.wali.SyncListener;
-import org.wali.UpdateType;
import org.wali.WriteAheadRepository;
/**
@@ -95,7 +86,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// effectively final
private WriteAheadRepository<RepositoryRecord> wal;
- private WriteAheadRecordSerde serde;
+ private RepositoryRecordSerdeFactory serdeFactory;
private ResourceClaimManager claimManager;
// WALI Provides the ability to register callbacks for when a Partition or the entire Repository is sync'ed with the underlying disk.
@@ -153,8 +144,8 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// TODO: Allow for backup path that can be used if disk out of space?? Would allow a snapshot to be stored on
// backup and then the data deleted from the normal location; then can move backup to normal location and
// delete backup. On restore, if no files exist in partition's directory, would have to check backup directory
- serde = new WriteAheadRecordSerde(claimManager);
- wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serde, this);
+ serdeFactory = new RepositoryRecordSerdeFactory(claimManager);
+ wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, numPartitions, serdeFactory, this);
}
@Override
@@ -319,6 +310,10 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
repoRecords.add(repoRecord);
}
+ // TODO: We should probably update this to support bulk 'SWAP OUT' records. As-is, we have to write out a
+ // 'SWAP OUT' record for each FlowFile, which includes the Update Type, FlowFile ID, swap file location, and Queue ID.
+ // We could instead have a single record with Update Type of 'SWAP OUT' and just include swap file location, Queue ID,
+ // and all FlowFile ID's.
// update WALI to indicate that the records were swapped out.
wal.update(repoRecords, true);
@@ -347,9 +342,9 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
for (final FlowFileQueue queue : queueProvider.getAllQueues()) {
queueMap.put(queue.getIdentifier(), queue);
}
- serde.setQueueMap(queueMap);
+ serdeFactory.setQueueMap(queueMap);
final Collection<RepositoryRecord> recordList = wal.recoverRecords();
- serde.setQueueMap(null);
+ serdeFactory.setQueueMap(null);
for (final RepositoryRecord record : recordList) {
final ContentClaim claim = record.getCurrentClaim();
@@ -361,7 +356,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
// Determine the next sequence number for FlowFiles
long maxId = minimumSequenceNumber;
for (final RepositoryRecord record : recordList) {
- final long recordId = serde.getRecordIdentifier(record);
+ final long recordId = serdeFactory.getRecordIdentifier(record);
if (recordId > maxId) {
maxId = recordId;
}
@@ -414,526 +409,4 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis
public int checkpoint() throws IOException {
return wal.checkpoint();
}
-
- private static class WriteAheadRecordSerde implements SerDe<RepositoryRecord> {
-
- private static final int CURRENT_ENCODING_VERSION = 9;
-
- public static final byte ACTION_CREATE = 0;
- public static final byte ACTION_UPDATE = 1;
- public static final byte ACTION_DELETE = 2;
- public static final byte ACTION_SWAPPED_OUT = 3;
- public static final byte ACTION_SWAPPED_IN = 4;
-
- private Map<String, FlowFileQueue> flowFileQueueMap = null;
- private long recordsRestored = 0L;
- private final ResourceClaimManager claimManager;
-
- public WriteAheadRecordSerde(final ResourceClaimManager claimManager) {
- this.claimManager = claimManager;
- }
-
- private void setQueueMap(final Map<String, FlowFileQueue> queueMap) {
- this.flowFileQueueMap = queueMap;
- }
-
- @Override
- public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException {
- serializeEdit(previousRecordState, record, out, false);
- }
-
- public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException {
- if (record.isMarkedForAbort()) {
- logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record);
- out.write(ACTION_DELETE);
- out.writeLong(getRecordIdentifier(record));
- serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
- return;
- }
-
- final UpdateType updateType = getUpdateType(record);
-
- if (updateType.equals(UpdateType.DELETE)) {
- out.write(ACTION_DELETE);
- out.writeLong(getRecordIdentifier(record));
- serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
- return;
- }
-
- // If there's a Destination Connection, that's the one that we want to associated with this record.
- // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection".
- // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen,
- // so we use the originalConnection instead
- FlowFileQueue associatedQueue = record.getDestination();
- if (associatedQueue == null) {
- associatedQueue = record.getOriginalQueue();
- }
-
- if (updateType.equals(UpdateType.SWAP_OUT)) {
- out.write(ACTION_SWAPPED_OUT);
- out.writeLong(getRecordIdentifier(record));
- out.writeUTF(associatedQueue.getIdentifier());
- out.writeUTF(getLocation(record));
- return;
- }
-
- final FlowFile flowFile = record.getCurrent();
- final ContentClaim claim = record.getCurrentClaim();
-
- switch (updateType) {
- case UPDATE:
- out.write(ACTION_UPDATE);
- break;
- case CREATE:
- out.write(ACTION_CREATE);
- break;
- case SWAP_IN:
- out.write(ACTION_SWAPPED_IN);
- break;
- default:
- throw new AssertionError();
- }
-
- out.writeLong(getRecordIdentifier(record));
- out.writeLong(flowFile.getEntryDate());
- out.writeLong(flowFile.getLineageStartDate());
- out.writeLong(flowFile.getLineageStartIndex());
-
- final Long queueDate = flowFile.getLastQueueDate();
- out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
- out.writeLong(flowFile.getQueueDateIndex());
- out.writeLong(flowFile.getSize());
-
- if (associatedQueue == null) {
- logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart",
- new Object[]{this, record});
- writeString("", out);
- } else {
- writeString(associatedQueue.getIdentifier(), out);
- }
-
- serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
-
- if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
- out.write(1); // indicate attributes changed
- final Map<String, String> attributes = flowFile.getAttributes();
- out.writeInt(attributes.size());
- for (final Map.Entry<String, String> entry : attributes.entrySet()) {
- writeString(entry.getKey(), out);
- writeString(entry.getValue(), out);
- }
- } else {
- out.write(0); // indicate attributes did not change
- }
-
- if (updateType == UpdateType.SWAP_IN) {
- out.writeUTF(record.getSwapLocation());
- }
- }
-
- @Override
- public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
- final int action = in.read();
- final long recordId = in.readLong();
- if (action == ACTION_DELETE) {
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-
- if (version > 4) {
- deserializeClaim(in, version, ffBuilder);
- }
-
- final FlowFileRecord flowFileRecord = ffBuilder.build();
- final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
- record.markForDelete();
-
- return record;
- }
-
- if (action == ACTION_SWAPPED_OUT) {
- final String queueId = in.readUTF();
- final String location = in.readUTF();
- final FlowFileQueue queue = flowFileQueueMap.get(queueId);
-
- final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .id(recordId)
- .build();
-
- return new StandardRepositoryRecord(queue, flowFileRecord, location);
- }
-
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- final RepositoryRecord record = currentRecordStates.get(recordId);
- ffBuilder.id(recordId);
- if (record != null) {
- ffBuilder.fromFlowFile(record.getCurrent());
- }
- ffBuilder.entryDate(in.readLong());
-
- if (version > 1) {
- // read the lineage identifiers and lineage start date, which were added in version 2.
- if (version < 9) {
- final int numLineageIds = in.readInt();
- for (int i = 0; i < numLineageIds; i++) {
- in.readUTF(); //skip identifiers
- }
- }
- final long lineageStartDate = in.readLong();
- final long lineageStartIndex;
- if (version > 7) {
- lineageStartIndex = in.readLong();
- } else {
- lineageStartIndex = 0L;
- }
- ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
- if (version > 5) {
- final long lastQueueDate = in.readLong();
- final long queueDateIndex;
- if (version > 7) {
- queueDateIndex = in.readLong();
- } else {
- queueDateIndex = 0L;
- }
-
- ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
- }
- }
-
- ffBuilder.size(in.readLong());
- final String connectionId = readString(in);
-
- logger.debug("{} -> {}", new Object[]{recordId, connectionId});
-
- deserializeClaim(in, version, ffBuilder);
-
- // recover new attributes, if they changed
- final int attributesChanged = in.read();
- if (attributesChanged == -1) {
- throw new EOFException();
- } else if (attributesChanged == 1) {
- final int numAttributes = in.readInt();
- final Map<String, String> attributes = new HashMap<>();
- for (int i = 0; i < numAttributes; i++) {
- final String key = readString(in);
- final String value = readString(in);
- attributes.put(key, value);
- }
-
- ffBuilder.addAttributes(attributes);
- } else if (attributesChanged != 0) {
- throw new IOException("Attribute Change Qualifier not found in stream; found value: "
- + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
- }
-
- final FlowFileRecord flowFile = ffBuilder.build();
- String swapLocation = null;
- if (action == ACTION_SWAPPED_IN) {
- swapLocation = in.readUTF();
- }
-
- final StandardRepositoryRecord standardRepoRecord;
-
- if (flowFileQueueMap == null) {
- standardRepoRecord = new StandardRepositoryRecord(null, flowFile);
- } else {
- final FlowFileQueue queue = flowFileQueueMap.get(connectionId);
- standardRepoRecord = new StandardRepositoryRecord(queue, flowFile);
- if (swapLocation != null) {
- standardRepoRecord.setSwapLocation(swapLocation);
- }
-
- if (connectionId.isEmpty()) {
- logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile);
- standardRepoRecord.markForAbort();
- } else if (queue == null) {
- logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId);
- standardRepoRecord.markForAbort();
- }
- }
-
- recordsRestored++;
- return standardRepoRecord;
- }
-
- @Override
- public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
- final int action = in.read();
- if (action == -1) {
- return null;
- }
-
- final long recordId = in.readLong();
- if (action == ACTION_DELETE) {
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
-
- if (version > 4) {
- deserializeClaim(in, version, ffBuilder);
- }
-
- final FlowFileRecord flowFileRecord = ffBuilder.build();
- final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
- record.markForDelete();
- return record;
- }
-
- // if action was not delete, it must be create/swap in
- final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
- final long entryDate = in.readLong();
-
- if (version > 1) {
- // read the lineage identifiers and lineage start date, which were added in version 2.
- if (version < 9) {
- final int numLineageIds = in.readInt();
- for (int i = 0; i < numLineageIds; i++) {
- in.readUTF(); //skip identifiers
- }
- }
-
- final long lineageStartDate = in.readLong();
- final long lineageStartIndex;
- if (version > 7) {
- lineageStartIndex = in.readLong();
- } else {
- lineageStartIndex = 0L;
- }
- ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
-
- if (version > 5) {
- final long lastQueueDate = in.readLong();
- final long queueDateIndex;
- if (version > 7) {
- queueDateIndex = in.readLong();
- } else {
- queueDateIndex = 0L;
- }
-
- ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
- }
- }
-
- final long size = in.readLong();
- final String connectionId = readString(in);
-
- logger.debug("{} -> {}", new Object[]{recordId, connectionId});
-
- ffBuilder.id(recordId);
- ffBuilder.entryDate(entryDate);
- ffBuilder.size(size);
-
- deserializeClaim(in, version, ffBuilder);
-
- final int attributesChanged = in.read();
- if (attributesChanged == 1) {
- final int numAttributes = in.readInt();
- final Map<String, String> attributes = new HashMap<>();
- for (int i = 0; i < numAttributes; i++) {
- final String key = readString(in);
- final String value = readString(in);
- attributes.put(key, value);
- }
-
- ffBuilder.addAttributes(attributes);
- } else if (attributesChanged == -1) {
- throw new EOFException();
- } else if (attributesChanged != 0) {
- throw new IOException("Attribute Change Qualifier not found in stream; found value: "
- + attributesChanged + " after successfully restoring " + recordsRestored + " records");
- }
-
- final FlowFileRecord flowFile = ffBuilder.build();
- String swapLocation = null;
- if (action == ACTION_SWAPPED_IN) {
- swapLocation = in.readUTF();
- }
-
- final StandardRepositoryRecord record;
-
- if (flowFileQueueMap == null) {
- record = new StandardRepositoryRecord(null, flowFile);
- } else {
- final FlowFileQueue queue = flowFileQueueMap.get(connectionId);
- record = new StandardRepositoryRecord(queue, flowFile);
- if (swapLocation != null) {
- record.setSwapLocation(swapLocation);
- }
-
- if (connectionId.isEmpty()) {
- logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile);
- record.markForAbort();
- } else if (queue == null) {
- logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId);
- record.markForAbort();
- }
- }
-
- recordsRestored++;
- return record;
- }
-
- @Override
- public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
- serializeEdit(null, record, out, true);
- }
-
- private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException {
- if (claim == null) {
- out.write(0);
- } else {
- out.write(1);
-
- final ResourceClaim resourceClaim = claim.getResourceClaim();
- writeString(resourceClaim.getId(), out);
- writeString(resourceClaim.getContainer(), out);
- writeString(resourceClaim.getSection(), out);
- out.writeLong(claim.getOffset());
- out.writeLong(claim.getLength());
-
- out.writeLong(offset);
- out.writeBoolean(resourceClaim.isLossTolerant());
- }
- }
-
- private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException {
- // determine current Content Claim.
- final int claimExists = in.read();
- if (claimExists == 1) {
- final String claimId;
- if (serializationVersion < 4) {
- claimId = String.valueOf(in.readLong());
- } else {
- claimId = readString(in);
- }
-
- final String container = readString(in);
- final String section = readString(in);
-
- final long resourceOffset;
- final long resourceLength;
- if (serializationVersion < 7) {
- resourceOffset = 0L;
- resourceLength = -1L;
- } else {
- resourceOffset = in.readLong();
- resourceLength = in.readLong();
- }
-
- final long claimOffset = in.readLong();
-
- final boolean lossTolerant;
- if (serializationVersion >= 3) {
- lossTolerant = in.readBoolean();
- } else {
- lossTolerant = false;
- }
-
- final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
- final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
- contentClaim.setLength(resourceLength);
-
- ffBuilder.contentClaim(contentClaim);
- ffBuilder.contentClaimOffset(claimOffset);
- } else if (claimExists == -1) {
- throw new EOFException();
- } else if (claimExists != 0) {
- throw new IOException("Claim Existence Qualifier not found in stream; found value: "
- + claimExists + " after successfully restoring " + recordsRestored + " records");
- }
- }
-
- private void writeString(final String toWrite, final OutputStream out) throws IOException {
- final byte[] bytes = toWrite.getBytes("UTF-8");
- final int utflen = bytes.length;
-
- if (utflen < 65535) {
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- } else {
- out.write(255);
- out.write(255);
- out.write(utflen >>> 24);
- out.write(utflen >>> 16);
- out.write(utflen >>> 8);
- out.write(utflen);
- out.write(bytes);
- }
- }
-
- private String readString(final InputStream in) throws IOException {
- final Integer numBytes = readFieldLength(in);
- if (numBytes == null) {
- throw new EOFException();
- }
- final byte[] bytes = new byte[numBytes];
- fillBuffer(in, bytes, numBytes);
- return new String(bytes, "UTF-8");
- }
-
- private Integer readFieldLength(final InputStream in) throws IOException {
- final int firstValue = in.read();
- final int secondValue = in.read();
- if (firstValue < 0) {
- return null;
- }
- if (secondValue < 0) {
- throw new EOFException();
- }
- if (firstValue == 0xff && secondValue == 0xff) {
- final int ch1 = in.read();
- final int ch2 = in.read();
- final int ch3 = in.read();
- final int ch4 = in.read();
- if ((ch1 | ch2 | ch3 | ch4) < 0) {
- throw new EOFException();
- }
- return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
- } else {
- return (firstValue << 8) + secondValue;
- }
- }
-
- private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
- int bytesRead;
- int totalBytesRead = 0;
- while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
- totalBytesRead += bytesRead;
- }
- if (totalBytesRead != length) {
- throw new EOFException();
- }
- }
-
- @Override
- public Long getRecordIdentifier(final RepositoryRecord record) {
- return record.getCurrent().getId();
- }
-
- @Override
- public UpdateType getUpdateType(final RepositoryRecord record) {
- switch (record.getType()) {
- case CONTENTMISSING:
- case DELETE:
- return UpdateType.DELETE;
- case CREATE:
- return UpdateType.CREATE;
- case UPDATE:
- return UpdateType.UPDATE;
- case SWAP_OUT:
- return UpdateType.SWAP_OUT;
- case SWAP_IN:
- return UpdateType.SWAP_IN;
- }
- return null;
- }
-
- @Override
- public int getVersion() {
- return CURRENT_ENCODING_VERSION;
- }
-
- @Override
- public String getLocation(final RepositoryRecord record) {
- return record.getSwapLocation();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
new file mode 100644
index 0000000..e8ce44e
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.flowfile.FlowFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wali.SerDe;
+import org.wali.UpdateType;
+
+public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> {
+ private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class);
+
+ private static final int CURRENT_ENCODING_VERSION = 9;
+
+ public static final byte ACTION_CREATE = 0;
+ public static final byte ACTION_UPDATE = 1;
+ public static final byte ACTION_DELETE = 2;
+ public static final byte ACTION_SWAPPED_OUT = 3;
+ public static final byte ACTION_SWAPPED_IN = 4;
+
+ private long recordsRestored = 0L;
+ private final ResourceClaimManager claimManager;
+
+ public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) {
+ this.claimManager = claimManager;
+ }
+
+ @Override
+ public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException {
+ serializeEdit(previousRecordState, record, out, false);
+ }
+
+ public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException {
+ if (record.isMarkedForAbort()) {
+ logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record);
+ out.write(ACTION_DELETE);
+ out.writeLong(getRecordIdentifier(record));
+ serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
+ return;
+ }
+
+ final UpdateType updateType = getUpdateType(record);
+
+ if (updateType.equals(UpdateType.DELETE)) {
+ out.write(ACTION_DELETE);
+ out.writeLong(getRecordIdentifier(record));
+ serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out);
+ return;
+ }
+
+ // If there's a Destination Connection, that's the one that we want to associated with this record.
+ // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection".
+ // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen,
+ // so we use the originalConnection instead
+ FlowFileQueue associatedQueue = record.getDestination();
+ if (associatedQueue == null) {
+ associatedQueue = record.getOriginalQueue();
+ }
+
+ if (updateType.equals(UpdateType.SWAP_OUT)) {
+ out.write(ACTION_SWAPPED_OUT);
+ out.writeLong(getRecordIdentifier(record));
+ out.writeUTF(associatedQueue.getIdentifier());
+ out.writeUTF(getLocation(record));
+ return;
+ }
+
+ final FlowFile flowFile = record.getCurrent();
+ final ContentClaim claim = record.getCurrentClaim();
+
+ switch (updateType) {
+ case UPDATE:
+ out.write(ACTION_UPDATE);
+ break;
+ case CREATE:
+ out.write(ACTION_CREATE);
+ break;
+ case SWAP_IN:
+ out.write(ACTION_SWAPPED_IN);
+ break;
+ default:
+ throw new AssertionError();
+ }
+
+ out.writeLong(getRecordIdentifier(record));
+ out.writeLong(flowFile.getEntryDate());
+ out.writeLong(flowFile.getLineageStartDate());
+ out.writeLong(flowFile.getLineageStartIndex());
+
+ final Long queueDate = flowFile.getLastQueueDate();
+ out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate);
+ out.writeLong(flowFile.getQueueDateIndex());
+ out.writeLong(flowFile.getSize());
+
+ if (associatedQueue == null) {
+ logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart",
+ new Object[] {this, record});
+ writeString("", out);
+ } else {
+ writeString(associatedQueue.getIdentifier(), out);
+ }
+
+ serializeContentClaim(claim, record.getCurrentClaimOffset(), out);
+
+ if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) {
+ out.write(1); // indicate attributes changed
+ final Map<String, String> attributes = flowFile.getAttributes();
+ out.writeInt(attributes.size());
+ for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+ writeString(entry.getKey(), out);
+ writeString(entry.getValue(), out);
+ }
+ } else {
+ out.write(0); // indicate attributes did not change
+ }
+
+ if (updateType == UpdateType.SWAP_IN) {
+ out.writeUTF(record.getSwapLocation());
+ }
+ }
+
+ @Override
+ public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException {
+ final int action = in.read();
+ final long recordId = in.readLong();
+ if (action == ACTION_DELETE) {
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
+
+ if (version > 4) {
+ deserializeClaim(in, version, ffBuilder);
+ }
+
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+ final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+ record.markForDelete();
+
+ return record;
+ }
+
+ if (action == ACTION_SWAPPED_OUT) {
+ final String queueId = in.readUTF();
+ final String location = in.readUTF();
+ final FlowFileQueue queue = getFlowFileQueue(queueId);
+
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(recordId)
+ .build();
+
+ return new StandardRepositoryRecord(queue, flowFileRecord, location);
+ }
+
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+ final RepositoryRecord record = currentRecordStates.get(recordId);
+ ffBuilder.id(recordId);
+ if (record != null) {
+ ffBuilder.fromFlowFile(record.getCurrent());
+ }
+ ffBuilder.entryDate(in.readLong());
+
+ if (version > 1) {
+ // read the lineage identifiers and lineage start date, which were added in version 2.
+ if (version < 9) {
+ final int numLineageIds = in.readInt();
+ for (int i = 0; i < numLineageIds; i++) {
+ in.readUTF(); //skip identifiers
+ }
+ }
+ final long lineageStartDate = in.readLong();
+ final long lineageStartIndex;
+ if (version > 7) {
+ lineageStartIndex = in.readLong();
+ } else {
+ lineageStartIndex = 0L;
+ }
+ ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+ if (version > 5) {
+ final long lastQueueDate = in.readLong();
+ final long queueDateIndex;
+ if (version > 7) {
+ queueDateIndex = in.readLong();
+ } else {
+ queueDateIndex = 0L;
+ }
+
+ ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+ }
+ }
+
+ ffBuilder.size(in.readLong());
+ final String connectionId = readString(in);
+
+ logger.debug("{} -> {}", new Object[] {recordId, connectionId});
+
+ deserializeClaim(in, version, ffBuilder);
+
+ // recover new attributes, if they changed
+ final int attributesChanged = in.read();
+ if (attributesChanged == -1) {
+ throw new EOFException();
+ } else if (attributesChanged == 1) {
+ final int numAttributes = in.readInt();
+ final Map<String, String> attributes = new HashMap<>();
+ for (int i = 0; i < numAttributes; i++) {
+ final String key = readString(in);
+ final String value = readString(in);
+ attributes.put(key, value);
+ }
+
+ ffBuilder.addAttributes(attributes);
+ } else if (attributesChanged != 0) {
+ throw new IOException("Attribute Change Qualifier not found in stream; found value: "
+ + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!");
+ }
+
+ final FlowFileRecord flowFile = ffBuilder.build();
+ String swapLocation = null;
+ if (action == ACTION_SWAPPED_IN) {
+ swapLocation = in.readUTF();
+ }
+
+ final FlowFileQueue queue = getFlowFileQueue(connectionId);
+ final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile);
+ if (swapLocation != null) {
+ standardRepoRecord.setSwapLocation(swapLocation);
+ }
+
+ if (connectionId.isEmpty()) {
+ logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile);
+ standardRepoRecord.markForAbort();
+ } else if (queue == null) {
+ logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId);
+ standardRepoRecord.markForAbort();
+ }
+
+ recordsRestored++;
+ return standardRepoRecord;
+ }
+
+ @Override
+ public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException {
+ final int action = in.read();
+ if (action == -1) {
+ return null;
+ }
+
+ final long recordId = in.readLong();
+ if (action == ACTION_DELETE) {
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId);
+
+ if (version > 4) {
+ deserializeClaim(in, version, ffBuilder);
+ }
+
+ final FlowFileRecord flowFileRecord = ffBuilder.build();
+ final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord);
+ record.markForDelete();
+ return record;
+ }
+
+ // if action was not delete, it must be create/swap in
+ final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder();
+ final long entryDate = in.readLong();
+
+ if (version > 1) {
+ // read the lineage identifiers and lineage start date, which were added in version 2.
+ if (version < 9) {
+ final int numLineageIds = in.readInt();
+ for (int i = 0; i < numLineageIds; i++) {
+ in.readUTF(); //skip identifiers
+ }
+ }
+
+ final long lineageStartDate = in.readLong();
+ final long lineageStartIndex;
+ if (version > 7) {
+ lineageStartIndex = in.readLong();
+ } else {
+ lineageStartIndex = 0L;
+ }
+ ffBuilder.lineageStart(lineageStartDate, lineageStartIndex);
+
+ if (version > 5) {
+ final long lastQueueDate = in.readLong();
+ final long queueDateIndex;
+ if (version > 7) {
+ queueDateIndex = in.readLong();
+ } else {
+ queueDateIndex = 0L;
+ }
+
+ ffBuilder.lastQueued(lastQueueDate, queueDateIndex);
+ }
+ }
+
+ final long size = in.readLong();
+ final String connectionId = readString(in);
+
+ logger.debug("{} -> {}", new Object[] {recordId, connectionId});
+
+ ffBuilder.id(recordId);
+ ffBuilder.entryDate(entryDate);
+ ffBuilder.size(size);
+
+ deserializeClaim(in, version, ffBuilder);
+
+ final int attributesChanged = in.read();
+ if (attributesChanged == 1) {
+ final int numAttributes = in.readInt();
+ final Map<String, String> attributes = new HashMap<>();
+ for (int i = 0; i < numAttributes; i++) {
+ final String key = readString(in);
+ final String value = readString(in);
+ attributes.put(key, value);
+ }
+
+ ffBuilder.addAttributes(attributes);
+ } else if (attributesChanged == -1) {
+ throw new EOFException();
+ } else if (attributesChanged != 0) {
+ throw new IOException("Attribute Change Qualifier not found in stream; found value: "
+ + attributesChanged + " after successfully restoring " + recordsRestored + " records");
+ }
+
+ final FlowFileRecord flowFile = ffBuilder.build();
+ String swapLocation = null;
+ if (action == ACTION_SWAPPED_IN) {
+ swapLocation = in.readUTF();
+ }
+
+ final StandardRepositoryRecord record;
+ final FlowFileQueue queue = getFlowFileQueue(connectionId);
+ record = new StandardRepositoryRecord(queue, flowFile);
+ if (swapLocation != null) {
+ record.setSwapLocation(swapLocation);
+ }
+
+ if (connectionId.isEmpty()) {
+ logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile);
+ record.markForAbort();
+ } else if (queue == null) {
+ logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId);
+ record.markForAbort();
+ }
+
+ recordsRestored++;
+ return record;
+ }
+
+ @Override
+ public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException {
+ serializeEdit(null, record, out, true);
+ }
+
+ private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException {
+ if (claim == null) {
+ out.write(0);
+ } else {
+ out.write(1);
+
+ final ResourceClaim resourceClaim = claim.getResourceClaim();
+ writeString(resourceClaim.getId(), out);
+ writeString(resourceClaim.getContainer(), out);
+ writeString(resourceClaim.getSection(), out);
+ out.writeLong(claim.getOffset());
+ out.writeLong(claim.getLength());
+
+ out.writeLong(offset);
+ out.writeBoolean(resourceClaim.isLossTolerant());
+ }
+ }
+
+ private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException {
+ // determine current Content Claim.
+ final int claimExists = in.read();
+ if (claimExists == 1) {
+ final String claimId;
+ if (serializationVersion < 4) {
+ claimId = String.valueOf(in.readLong());
+ } else {
+ claimId = readString(in);
+ }
+
+ final String container = readString(in);
+ final String section = readString(in);
+
+ final long resourceOffset;
+ final long resourceLength;
+ if (serializationVersion < 7) {
+ resourceOffset = 0L;
+ resourceLength = -1L;
+ } else {
+ resourceOffset = in.readLong();
+ resourceLength = in.readLong();
+ }
+
+ final long claimOffset = in.readLong();
+
+ final boolean lossTolerant;
+ if (serializationVersion >= 3) {
+ lossTolerant = in.readBoolean();
+ } else {
+ lossTolerant = false;
+ }
+
+ final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false);
+ final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset);
+ contentClaim.setLength(resourceLength);
+
+ ffBuilder.contentClaim(contentClaim);
+ ffBuilder.contentClaimOffset(claimOffset);
+ } else if (claimExists == -1) {
+ throw new EOFException();
+ } else if (claimExists != 0) {
+ throw new IOException("Claim Existence Qualifier not found in stream; found value: "
+ + claimExists + " after successfully restoring " + recordsRestored + " records");
+ }
+ }
+
+ private void writeString(final String toWrite, final OutputStream out) throws IOException {
+ final byte[] bytes = toWrite.getBytes("UTF-8");
+ final int utflen = bytes.length;
+
+ if (utflen < 65535) {
+ out.write(utflen >>> 8);
+ out.write(utflen);
+ out.write(bytes);
+ } else {
+ out.write(255);
+ out.write(255);
+ out.write(utflen >>> 24);
+ out.write(utflen >>> 16);
+ out.write(utflen >>> 8);
+ out.write(utflen);
+ out.write(bytes);
+ }
+ }
+
+ private String readString(final InputStream in) throws IOException {
+ final Integer numBytes = readFieldLength(in);
+ if (numBytes == null) {
+ throw new EOFException();
+ }
+ final byte[] bytes = new byte[numBytes];
+ fillBuffer(in, bytes, numBytes);
+ return new String(bytes, "UTF-8");
+ }
+
+ private Integer readFieldLength(final InputStream in) throws IOException {
+ final int firstValue = in.read();
+ final int secondValue = in.read();
+ if (firstValue < 0) {
+ return null;
+ }
+ if (secondValue < 0) {
+ throw new EOFException();
+ }
+ if (firstValue == 0xff && secondValue == 0xff) {
+ final int ch1 = in.read();
+ final int ch2 = in.read();
+ final int ch3 = in.read();
+ final int ch4 = in.read();
+ if ((ch1 | ch2 | ch3 | ch4) < 0) {
+ throw new EOFException();
+ }
+ return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
+ } else {
+ return (firstValue << 8) + secondValue;
+ }
+ }
+
+ private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException {
+ int bytesRead;
+ int totalBytesRead = 0;
+ while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) {
+ totalBytesRead += bytesRead;
+ }
+ if (totalBytesRead != length) {
+ throw new EOFException();
+ }
+ }
+
+ @Override
+ public int getVersion() {
+ return CURRENT_ENCODING_VERSION;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
index 25dbaee..7e87199 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
@@ -64,29 +64,6 @@ public class StandardResourceClaim implements ResourceClaim, Comparable<Resource
return section;
}
- /**
- * Provides the natural ordering for ResourceClaim objects. By default they are sorted by their id, then container, then section
- *
- * @param other other claim
- * @return x such that x <=1 if this is less than other;
- * x=0 if this.equals(other);
- * x >= 1 if this is greater than other
- */
- @Override
- public int compareTo(final ResourceClaim other) {
- final int idComparison = id.compareTo(other.getId());
- if (idComparison != 0) {
- return idComparison;
- }
-
- final int containerComparison = container.compareTo(other.getContainer());
- if (containerComparison != 0) {
- return containerComparison;
- }
-
- return section.compareTo(other.getSection());
- }
-
@Override
public boolean equals(final Object other) {
if (this == other) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 7d554b1..e4f060e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -29,10 +29,9 @@ import org.slf4j.LoggerFactory;
public class StandardResourceClaimManager implements ResourceClaimManager {
- private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>();
private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class);
-
- private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
+ private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>();
+ private final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000);
@Override
public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) {
@@ -50,7 +49,7 @@ public class StandardResourceClaimManager implements ResourceClaimManager {
return (count == null) ? null : count.getClaim();
}
- private static AtomicInteger getCounter(final ResourceClaim claim) {
+ private AtomicInteger getCounter(final ResourceClaim claim) {
if (claim == null) {
return null;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java
deleted file mode 100644
index 7de25ac..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingInputStream.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ByteCountingInputStream extends InputStream {
-
- private final AtomicLong bytesReadHolder;
- private final InputStream in;
- private long bytesSkipped = 0L;
-
- public ByteCountingInputStream(final InputStream in, final AtomicLong longHolder) {
- this.in = in;
- this.bytesReadHolder = longHolder;
- }
-
- @Override
- public int read() throws IOException {
- final int fromSuper = in.read();
- if (fromSuper >= 0) {
- bytesReadHolder.getAndIncrement();
- }
- return fromSuper;
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
- final int fromSuper = in.read(b, off, len);
- if (fromSuper >= 0) {
- bytesReadHolder.getAndAdd(fromSuper);
- }
-
- return fromSuper;
- }
-
- @Override
- public int read(byte[] b) throws IOException {
- return read(b, 0, b.length);
- }
-
- @Override
- public long skip(final long n) throws IOException {
- final long skipped = in.skip(n);
- bytesSkipped += skipped;
- return skipped;
- }
-
- @Override
- public int available() throws IOException {
- return in.available();
- }
-
- @Override
- public void mark(int readlimit) {
- in.mark(readlimit);
- }
-
- @Override
- public boolean markSupported() {
- return in.markSupported();
- }
-
- @Override
- public void reset() throws IOException {
- in.reset();
- }
-
- @Override
- public void close() throws IOException {
- in.close();
- }
-
- public long getBytesRead() {
- return bytesReadHolder.get();
- }
-
- public long getBytesSkipped() {
- return bytesSkipped;
- }
-
- public long getStreamLocation() {
- return getBytesRead() + getBytesSkipped();
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
deleted file mode 100644
index 7c778a2..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/io/ByteCountingOutputStream.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.repository.io;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ByteCountingOutputStream extends OutputStream {
-
- private final AtomicLong bytesWrittenHolder;
- private final OutputStream out;
-
- public ByteCountingOutputStream(final OutputStream out, final AtomicLong longHolder) {
- this.out = out;
- this.bytesWrittenHolder = longHolder;
- }
-
- @Override
- public void write(int b) throws IOException {
- out.write(b);
- bytesWrittenHolder.getAndIncrement();
- }
-
- @Override
- public void write(byte[] b) throws IOException {
- write(b, 0, b.length);
- }
-
- @Override
- public void write(byte[] b, int off, int len) throws IOException {
- out.write(b, off, len);
- bytesWrittenHolder.getAndAdd(len);
- }
-
- public long getBytesWritten() {
- return bytesWrittenHolder.get();
- }
-
- @Override
- public void flush() throws IOException {
- out.flush();
- }
-
- @Override
- public void close() throws IOException {
- out.close();
- }
-}