You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/11/18 19:54:02 UTC
[2/7] nifi git commit: NIFI-2854: Refactor repositories and swap
files to use schema-based serialization so that nifi can be rolled back to a
previous version after an upgrade.
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
index 35832c4..a95bd4f 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/StandardRecordWriter.java
@@ -17,239 +17,142 @@
package org.apache.nifi.provenance;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
import org.apache.nifi.provenance.serialization.RecordWriter;
import org.apache.nifi.provenance.toc.TocWriter;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.stream.io.GZIPOutputStream;
-import org.apache.nifi.stream.io.NonCloseableOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StandardRecordWriter implements RecordWriter {
+/**
+ * @deprecated Deprecated in favor of SchemaRecordWriter
+ */
+@Deprecated
+public class StandardRecordWriter extends CompressableRecordWriter implements RecordWriter {
private static final Logger logger = LoggerFactory.getLogger(StandardRecordWriter.class);
+ public static final int SERIALIZATION_VERISON = 9;
+ public static final String SERIALIZATION_NAME = "org.apache.nifi.provenance.PersistentProvenanceRepository";
private final File file;
- private final FileOutputStream fos;
- private final ByteCountingOutputStream rawOutStream;
- private final TocWriter tocWriter;
- private final boolean compressed;
- private final int uncompressedBlockSize;
- private final AtomicBoolean dirtyFlag = new AtomicBoolean(false);
-
- private DataOutputStream out;
- private ByteCountingOutputStream byteCountingOut;
- private long lastBlockOffset = 0L;
- private int recordCount = 0;
- private volatile boolean closed = false;
-
- private final Lock lock = new ReentrantLock();
public StandardRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+ super(file, writer, compressed, uncompressedBlockSize);
logger.trace("Creating Record Writer for {}", file.getName());
this.file = file;
- this.compressed = compressed;
- this.fos = new FileOutputStream(file);
- rawOutStream = new ByteCountingOutputStream(fos);
- this.uncompressedBlockSize = uncompressedBlockSize;
+ }
- this.tocWriter = writer;
+ public StandardRecordWriter(final OutputStream out, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+ super(out, tocWriter, compressed, uncompressedBlockSize);
+ this.file = null;
}
@Override
- public synchronized File getFile() {
- return file;
+ protected String getSerializationName() {
+ return SERIALIZATION_NAME;
}
@Override
- public synchronized void writeHeader(final long firstEventId) throws IOException {
- if (isDirty()) {
- throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
- }
-
- try {
- lastBlockOffset = rawOutStream.getBytesWritten();
- resetWriteStream(firstEventId);
-
- out.writeUTF(PersistentProvenanceRepository.class.getName());
- out.writeInt(PersistentProvenanceRepository.SERIALIZATION_VERSION);
- out.flush();
- } catch (final IOException ioe) {
- markDirty();
- throw ioe;
- }
+ protected int getSerializationVersion() {
+ return SERIALIZATION_VERISON;
}
-
- /**
- * Resets the streams to prepare for a new block
- * @param eventId the first id that will be written to the new block
- * @throws IOException if unable to flush/close the current streams properly
- */
- private void resetWriteStream(final long eventId) throws IOException {
- try {
- if (out != null) {
- out.flush();
- }
-
- final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
-
- final OutputStream writableStream;
- if ( compressed ) {
- // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
- // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
- // the underlying OutputStream in a NonCloseableOutputStream
- // We don't have to check if the writer is dirty because we will have already checked before calling this method.
- if ( out != null ) {
- out.close();
- }
-
- if ( tocWriter != null ) {
- tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
- }
-
- writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
- } else {
- if ( tocWriter != null ) {
- tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
- }
-
- writableStream = new BufferedOutputStream(rawOutStream, 65536);
- }
-
- this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
- this.out = new DataOutputStream(byteCountingOut);
- dirtyFlag.set(false);
- } catch (final IOException ioe) {
- markDirty();
- throw ioe;
- }
+ @Override
+ protected void writeHeader(long firstEventId, DataOutputStream out) throws IOException {
}
@Override
- public synchronized long writeRecord(final ProvenanceEventRecord record, final long recordIdentifier) throws IOException {
- if (isDirty()) {
- throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
+ protected void writeRecord(final ProvenanceEventRecord record, final long recordIdentifier, final DataOutputStream out) throws IOException {
+ final ProvenanceEventType recordType = record.getEventType();
+
+ out.writeLong(recordIdentifier);
+ out.writeUTF(record.getEventType().name());
+ out.writeLong(record.getEventTime());
+ out.writeLong(record.getFlowFileEntryDate());
+ out.writeLong(record.getEventDuration());
+ out.writeLong(record.getLineageStartDate());
+
+ writeNullableString(out, record.getComponentId());
+ writeNullableString(out, record.getComponentType());
+ writeUUID(out, record.getFlowFileUuid());
+ writeNullableString(out, record.getDetails());
+
+ // Write FlowFile attributes
+ final Map<String, String> attrs = record.getPreviousAttributes();
+ out.writeInt(attrs.size());
+ for (final Map.Entry<String, String> entry : attrs.entrySet()) {
+ writeLongString(out, entry.getKey());
+ writeLongString(out, entry.getValue());
}
- try {
- final ProvenanceEventType recordType = record.getEventType();
- final long startBytes = byteCountingOut.getBytesWritten();
-
- // add a new block to the TOC if needed.
- if ( tocWriter != null && (startBytes - lastBlockOffset >= uncompressedBlockSize) ) {
- lastBlockOffset = startBytes;
-
- if ( compressed ) {
- // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
- // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
- // the underlying OutputStream in a NonCloseableOutputStream
- resetWriteStream(recordIdentifier);
- }
- }
-
- out.writeLong(recordIdentifier);
- out.writeUTF(record.getEventType().name());
- out.writeLong(record.getEventTime());
- out.writeLong(record.getFlowFileEntryDate());
- out.writeLong(record.getEventDuration());
- out.writeLong(record.getLineageStartDate());
-
- writeNullableString(out, record.getComponentId());
- writeNullableString(out, record.getComponentType());
- writeUUID(out, record.getFlowFileUuid());
- writeNullableString(out, record.getDetails());
-
- // Write FlowFile attributes
- final Map<String, String> attrs = record.getPreviousAttributes();
- out.writeInt(attrs.size());
- for (final Map.Entry<String, String> entry : attrs.entrySet()) {
- writeLongString(out, entry.getKey());
- writeLongString(out, entry.getValue());
- }
-
- final Map<String, String> attrUpdates = record.getUpdatedAttributes();
- out.writeInt(attrUpdates.size());
- for (final Map.Entry<String, String> entry : attrUpdates.entrySet()) {
- writeLongString(out, entry.getKey());
- writeLongNullableString(out, entry.getValue());
- }
+ final Map<String, String> attrUpdates = record.getUpdatedAttributes();
+ out.writeInt(attrUpdates.size());
+ for (final Map.Entry<String, String> entry : attrUpdates.entrySet()) {
+ writeLongString(out, entry.getKey());
+ writeLongNullableString(out, entry.getValue());
+ }
- // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
- if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
- out.writeBoolean(true);
- out.writeUTF(record.getContentClaimContainer());
- out.writeUTF(record.getContentClaimSection());
- out.writeUTF(record.getContentClaimIdentifier());
- if (record.getContentClaimOffset() == null) {
- out.writeLong(0L);
- } else {
- out.writeLong(record.getContentClaimOffset());
- }
- out.writeLong(record.getFileSize());
+ // If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
+ if (record.getContentClaimSection() != null && record.getContentClaimContainer() != null && record.getContentClaimIdentifier() != null) {
+ out.writeBoolean(true);
+ out.writeUTF(record.getContentClaimContainer());
+ out.writeUTF(record.getContentClaimSection());
+ out.writeUTF(record.getContentClaimIdentifier());
+ if (record.getContentClaimOffset() == null) {
+ out.writeLong(0L);
} else {
- out.writeBoolean(false);
+ out.writeLong(record.getContentClaimOffset());
}
+ out.writeLong(record.getFileSize());
+ } else {
+ out.writeBoolean(false);
+ }
- // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
- if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) {
- out.writeBoolean(true);
- out.writeUTF(record.getPreviousContentClaimContainer());
- out.writeUTF(record.getPreviousContentClaimSection());
- out.writeUTF(record.getPreviousContentClaimIdentifier());
- if (record.getPreviousContentClaimOffset() == null) {
- out.writeLong(0L);
- } else {
- out.writeLong(record.getPreviousContentClaimOffset());
- }
-
- if (record.getPreviousFileSize() == null) {
- out.writeLong(0L);
- } else {
- out.writeLong(record.getPreviousFileSize());
- }
+ // If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
+ if (record.getPreviousContentClaimSection() != null && record.getPreviousContentClaimContainer() != null && record.getPreviousContentClaimIdentifier() != null) {
+ out.writeBoolean(true);
+ out.writeUTF(record.getPreviousContentClaimContainer());
+ out.writeUTF(record.getPreviousContentClaimSection());
+ out.writeUTF(record.getPreviousContentClaimIdentifier());
+ if (record.getPreviousContentClaimOffset() == null) {
+ out.writeLong(0L);
} else {
- out.writeBoolean(false);
+ out.writeLong(record.getPreviousContentClaimOffset());
}
- // write out the identifier of the destination queue.
- writeNullableString(out, record.getSourceQueueIdentifier());
-
- // Write type-specific info
- if (recordType == ProvenanceEventType.FORK || recordType == ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || recordType == ProvenanceEventType.REPLAY) {
- writeUUIDs(out, record.getParentUuids());
- writeUUIDs(out, record.getChildUuids());
- } else if (recordType == ProvenanceEventType.RECEIVE) {
- writeNullableString(out, record.getTransitUri());
- writeNullableString(out, record.getSourceSystemFlowFileIdentifier());
- } else if (recordType == ProvenanceEventType.FETCH) {
- writeNullableString(out, record.getTransitUri());
- } else if (recordType == ProvenanceEventType.SEND) {
- writeNullableString(out, record.getTransitUri());
- } else if (recordType == ProvenanceEventType.ADDINFO) {
- writeNullableString(out, record.getAlternateIdentifierUri());
- } else if (recordType == ProvenanceEventType.ROUTE) {
- writeNullableString(out, record.getRelationship());
+ if (record.getPreviousFileSize() == null) {
+ out.writeLong(0L);
+ } else {
+ out.writeLong(record.getPreviousFileSize());
}
+ } else {
+ out.writeBoolean(false);
+ }
- out.flush();
- recordCount++;
- return byteCountingOut.getBytesWritten() - startBytes;
- } catch (final IOException ioe) {
- markDirty();
- throw ioe;
+ // write out the identifier of the destination queue.
+ writeNullableString(out, record.getSourceQueueIdentifier());
+
+ // Write type-specific info
+ if (recordType == ProvenanceEventType.FORK || recordType == ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || recordType == ProvenanceEventType.REPLAY) {
+ writeUUIDs(out, record.getParentUuids());
+ writeUUIDs(out, record.getChildUuids());
+ } else if (recordType == ProvenanceEventType.RECEIVE) {
+ writeNullableString(out, record.getTransitUri());
+ writeNullableString(out, record.getSourceSystemFlowFileIdentifier());
+ } else if (recordType == ProvenanceEventType.FETCH) {
+ writeNullableString(out, record.getTransitUri());
+ } else if (recordType == ProvenanceEventType.SEND) {
+ writeNullableString(out, record.getTransitUri());
+ } else if (recordType == ProvenanceEventType.ADDINFO) {
+ writeNullableString(out, record.getAlternateIdentifierUri());
+ } else if (recordType == ProvenanceEventType.ROUTE) {
+ writeNullableString(out, record.getRelationship());
}
}
@@ -292,110 +195,9 @@ public class StandardRecordWriter implements RecordWriter {
out.write(bytes);
}
- @Override
- public synchronized void close() throws IOException {
- closed = true;
-
- logger.trace("Closing Record Writer for {}", file.getName());
-
- lock();
- try {
- try {
- // We want to close 'out' only if the writer is not 'dirty'.
- // If the writer is dirty, then there was a failure to write
- // to disk, which means that we likely have a partial record written
- // to disk.
- //
- // If we call close() on out, it will in turn call flush() on the underlying
- // output stream, which is a BufferedOutputStream. As a result, we will end
- // up flushing the buffer after a partially written record, which results in
- // essentially random bytes being written to the repository, which causes
- // corruption and un-recoverability. Since we will close the underlying 'rawOutStream'
- // below, we will still appropriately clean up the resources help by this writer, so
- // we are still OK in terms of closing all resources held by the writer.
- if (out != null && !isDirty()) {
- out.close();
- }
- } finally {
- try {
- rawOutStream.close();
- } finally {
- if (tocWriter != null) {
- tocWriter.close();
- }
- }
- }
- } catch (final IOException ioe) {
- markDirty();
- throw ioe;
- } finally {
- unlock();
- }
- }
-
- @Override
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public synchronized int getRecordsWritten() {
- return recordCount;
- }
-
- @Override
- public void lock() {
- lock.lock();
- }
-
- @Override
- public void unlock() {
- lock.unlock();
- }
-
- @Override
- public boolean tryLock() {
- final boolean obtainedLock = lock.tryLock();
- if (obtainedLock && dirtyFlag.get()) {
- // once we have obtained the lock, we need to check if the writer
- // has been marked dirty. If so, we cannot write to the underlying
- // file, so we need to unlock and return false. Otherwise, it's okay
- // to write to the underlying file, so return true.
- lock.unlock();
- return false;
- }
- return obtainedLock;
- }
@Override
public String toString() {
return "StandardRecordWriter[file=" + file + "]";
}
-
- @Override
- public void sync() throws IOException {
- try {
- if ( tocWriter != null ) {
- tocWriter.sync();
- }
- fos.getFD().sync();
- } catch (final IOException ioe) {
- markDirty();
- throw ioe;
- }
- }
-
- @Override
- public TocWriter getTocWriter() {
- return tocWriter;
- }
-
- @Override
- public void markDirty() {
- dirtyFlag.set(true);
- }
-
- public boolean isDirty() {
- return dirtyFlag.get();
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
index f474661..f725208 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/lucene/IndexingAction.java
@@ -19,7 +19,6 @@ package org.apache.nifi.provenance.lucene;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import org.apache.lucene.document.Document;
@@ -45,7 +44,7 @@ public class IndexingAction {
}
private void addField(final Document doc, final SearchableField field, final String value, final Store store) {
- if (value == null || (!nonAttributeSearchableFields.contains(field) && !field.isAttribute())) {
+ if (value == null || (!field.isAttribute() && !nonAttributeSearchableFields.contains(field))) {
return;
}
@@ -54,11 +53,9 @@ public class IndexingAction {
public void index(final StandardProvenanceEventRecord record, final IndexWriter indexWriter, final Integer blockIndex) throws IOException {
- final Map<String, String> attributes = record.getAttributes();
-
final Document doc = new Document();
addField(doc, SearchableFields.FlowFileUUID, record.getFlowFileUuid(), Store.NO);
- addField(doc, SearchableFields.Filename, attributes.get(CoreAttributes.FILENAME.key()), Store.NO);
+ addField(doc, SearchableFields.Filename, record.getAttribute(CoreAttributes.FILENAME.key()), Store.NO);
addField(doc, SearchableFields.ComponentID, record.getComponentId(), Store.NO);
addField(doc, SearchableFields.AlternateIdentifierURI, record.getAlternateIdentifierUri(), Store.NO);
addField(doc, SearchableFields.EventType, record.getEventType().name(), Store.NO);
@@ -68,13 +65,10 @@ public class IndexingAction {
addField(doc, SearchableFields.ContentClaimContainer, record.getContentClaimContainer(), Store.NO);
addField(doc, SearchableFields.ContentClaimIdentifier, record.getContentClaimIdentifier(), Store.NO);
addField(doc, SearchableFields.SourceQueueIdentifier, record.getSourceQueueIdentifier(), Store.NO);
-
- if (nonAttributeSearchableFields.contains(SearchableFields.TransitURI)) {
- addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
- }
+ addField(doc, SearchableFields.TransitURI, record.getTransitUri(), Store.NO);
for (final SearchableField searchableField : attributeSearchableFields) {
- addField(doc, searchableField, LuceneUtil.truncateIndexField(attributes.get(searchableField.getSearchableFieldName())), Store.NO);
+ addField(doc, searchableField, LuceneUtil.truncateIndexField(record.getAttribute(searchableField.getSearchableFieldName())), Store.NO);
}
final String storageFilename = LuceneUtil.substringBefore(record.getStorageFilename(), ".");
@@ -94,19 +88,20 @@ public class IndexingAction {
}
// If it's event is a FORK, or JOIN, add the FlowFileUUID for all child/parent UUIDs.
- if (record.getEventType() == ProvenanceEventType.FORK || record.getEventType() == ProvenanceEventType.CLONE || record.getEventType() == ProvenanceEventType.REPLAY) {
+ final ProvenanceEventType eventType = record.getEventType();
+ if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) {
for (final String uuid : record.getChildUuids()) {
if (!uuid.equals(record.getFlowFileUuid())) {
addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
}
}
- } else if (record.getEventType() == ProvenanceEventType.JOIN) {
+ } else if (eventType == ProvenanceEventType.JOIN) {
for (final String uuid : record.getParentUuids()) {
if (!uuid.equals(record.getFlowFileUuid())) {
addField(doc, SearchableFields.FlowFileUUID, uuid, Store.NO);
}
}
- } else if (record.getEventType() == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
+ } else if (eventType == ProvenanceEventType.RECEIVE && record.getSourceSystemFlowFileIdentifier() != null) {
// If we get a receive with a Source System FlowFile Identifier, we add another Document that shows the UUID
// that the Source System uses to refer to the data.
final String sourceIdentifier = record.getSourceSystemFlowFileIdentifier();
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
new file mode 100644
index 0000000..c9e7dc8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecord.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.repository.schema.FieldMapRecord;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class EventRecord implements Record {
+ private final RecordSchema schema;
+ private final ProvenanceEventRecord event;
+ private final long eventId;
+ private final Record contentClaimRecord;
+ private final Record previousClaimRecord;
+
+ public EventRecord(final ProvenanceEventRecord event, final long eventId, final RecordSchema schema, final RecordSchema contentClaimSchema) {
+ this.schema = schema;
+ this.event = event;
+ this.eventId = eventId;
+ this.contentClaimRecord = createContentClaimRecord(contentClaimSchema, event.getContentClaimContainer(), event.getContentClaimSection(),
+ event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize());
+ this.previousClaimRecord = createContentClaimRecord(contentClaimSchema, event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(),
+ event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset(), event.getPreviousFileSize());
+ }
+
+ @Override
+ public RecordSchema getSchema() {
+ return schema;
+ }
+
+ private static Record createContentClaimRecord(final RecordSchema contentClaimSchema, final String container, final String section,
+ final String identifier, final Long offset, final Long size) {
+ if (container == null || section == null || identifier == null) {
+ return null;
+ }
+
+ final Map<RecordField, Object> fieldValues = new HashMap<>();
+ fieldValues.put(EventRecordFields.CONTENT_CLAIM_CONTAINER, container);
+ fieldValues.put(EventRecordFields.CONTENT_CLAIM_SECTION, section);
+ fieldValues.put(EventRecordFields.CONTENT_CLAIM_IDENTIFIER, identifier);
+ fieldValues.put(EventRecordFields.CONTENT_CLAIM_OFFSET, offset);
+ fieldValues.put(EventRecordFields.CONTENT_CLAIM_SIZE, size);
+ return new FieldMapRecord(fieldValues, contentClaimSchema);
+ }
+
+ @Override
+ public Object getFieldValue(final String fieldName) {
+ switch (fieldName) {
+ case EventRecordFields.Names.EVENT_IDENTIFIER:
+ return eventId;
+ case EventRecordFields.Names.ALTERNATE_IDENTIFIER:
+ return event.getAlternateIdentifierUri();
+ case EventRecordFields.Names.CHILD_UUIDS:
+ return event.getChildUuids();
+ case EventRecordFields.Names.COMPONENT_ID:
+ return event.getComponentId();
+ case EventRecordFields.Names.COMPONENT_TYPE:
+ return event.getComponentType();
+ case EventRecordFields.Names.CONTENT_CLAIM:
+ return contentClaimRecord;
+ case EventRecordFields.Names.EVENT_DETAILS:
+ return event.getDetails();
+ case EventRecordFields.Names.EVENT_DURATION:
+ return event.getEventDuration();
+ case EventRecordFields.Names.EVENT_TIME:
+ return event.getEventTime();
+ case EventRecordFields.Names.EVENT_TYPE:
+ return event.getEventType().name();
+ case EventRecordFields.Names.FLOWFILE_ENTRY_DATE:
+ return event.getFlowFileEntryDate();
+ case EventRecordFields.Names.FLOWFILE_UUID:
+ return event.getFlowFileUuid();
+ case EventRecordFields.Names.LINEAGE_START_DATE:
+ return event.getLineageStartDate();
+ case EventRecordFields.Names.PARENT_UUIDS:
+ return event.getParentUuids();
+ case EventRecordFields.Names.PREVIOUS_ATTRIBUTES:
+ return event.getPreviousAttributes();
+ case EventRecordFields.Names.PREVIOUS_CONTENT_CLAIM:
+ return previousClaimRecord;
+ case EventRecordFields.Names.RELATIONSHIP:
+ return event.getRelationship();
+ case EventRecordFields.Names.SOURCE_QUEUE_IDENTIFIER:
+ return event.getSourceQueueIdentifier();
+ case EventRecordFields.Names.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER:
+ return event.getSourceSystemFlowFileIdentifier();
+ case EventRecordFields.Names.TRANSIT_URI:
+ return event.getTransitUri();
+ case EventRecordFields.Names.UPDATED_ATTRIBUTES:
+ return event.getUpdatedAttributes();
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static StandardProvenanceEventRecord getEvent(final Record record, final String storageFilename, final long storageByteOffset, final int maxAttributeLength) {
+ final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setAlternateIdentifierUri((String) record.getFieldValue(EventRecordFields.Names.ALTERNATE_IDENTIFIER));
+ builder.setChildUuids((List<String>) record.getFieldValue(EventRecordFields.Names.CHILD_UUIDS));
+ builder.setComponentId((String) record.getFieldValue(EventRecordFields.Names.COMPONENT_ID));
+ builder.setComponentType((String) record.getFieldValue(EventRecordFields.Names.COMPONENT_TYPE));
+ builder.setDetails((String) record.getFieldValue(EventRecordFields.Names.EVENT_DETAILS));
+ builder.setEventDuration((Long) record.getFieldValue(EventRecordFields.Names.EVENT_DURATION));
+ builder.setEventTime((Long) record.getFieldValue(EventRecordFields.Names.EVENT_TIME));
+ builder.setEventType(ProvenanceEventType.valueOf((String) record.getFieldValue(EventRecordFields.Names.EVENT_TYPE)));
+ builder.setFlowFileEntryDate((Long) record.getFieldValue(EventRecordFields.Names.FLOWFILE_ENTRY_DATE));
+ builder.setFlowFileUUID((String) record.getFieldValue(EventRecordFields.Names.FLOWFILE_UUID));
+ builder.setLineageStartDate((Long) record.getFieldValue(EventRecordFields.Names.LINEAGE_START_DATE));
+ builder.setParentUuids((List<String>) record.getFieldValue(EventRecordFields.Names.PARENT_UUIDS));
+ builder.setPreviousAttributes(truncateAttributes((Map<String, String>) record.getFieldValue(EventRecordFields.Names.PREVIOUS_ATTRIBUTES), maxAttributeLength));
+ builder.setEventId((Long) record.getFieldValue(EventRecordFields.Names.EVENT_IDENTIFIER));
+ builder.setRelationship((String) record.getFieldValue(EventRecordFields.Names.RELATIONSHIP));
+ builder.setSourceQueueIdentifier((String) record.getFieldValue(EventRecordFields.Names.SOURCE_QUEUE_IDENTIFIER));
+ builder.setSourceSystemFlowFileIdentifier((String) record.getFieldValue(EventRecordFields.Names.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER));
+ builder.setTransitUri((String) record.getFieldValue(EventRecordFields.Names.TRANSIT_URI));
+ builder.setUpdatedAttributes(truncateAttributes((Map<String, String>) record.getFieldValue(EventRecordFields.Names.UPDATED_ATTRIBUTES), maxAttributeLength));
+
+ builder.setStorageLocation(storageFilename, storageByteOffset);
+
+ final Record currentClaimRecord = (Record) record.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM);
+ if (currentClaimRecord == null) {
+ builder.setCurrentContentClaim(null, null, null, null, 0L);
+ } else {
+ builder.setCurrentContentClaim(
+ (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_CONTAINER),
+ (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SECTION),
+ (String) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_IDENTIFIER),
+ (Long) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_OFFSET),
+ (Long) currentClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SIZE));
+ }
+
+ final Record previousClaimRecord = (Record) record.getFieldValue(EventRecordFields.Names.PREVIOUS_CONTENT_CLAIM);
+ if (previousClaimRecord != null) {
+ builder.setPreviousContentClaim(
+ (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_CONTAINER),
+ (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SECTION),
+ (String) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_IDENTIFIER),
+ (Long) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_OFFSET),
+ (Long) previousClaimRecord.getFieldValue(EventRecordFields.Names.CONTENT_CLAIM_SIZE));
+ }
+
+ return builder.build();
+ }
+
+ private static Map<String, String> truncateAttributes(final Map<String, String> attributes, final int maxAttributeLength) {
+ if (attributes == null) {
+ return null;
+ }
+
+ // Check if any attribute value exceeds the attribute length
+ final boolean anyExceedsLength = attributes.values().stream()
+ .filter(value -> value != null)
+ .anyMatch(value -> value.length() > maxAttributeLength);
+
+ if (!anyExceedsLength) {
+ return attributes;
+ }
+
+ final Map<String, String> truncated = new HashMap<>();
+ for (final Map.Entry<String, String> entry : attributes.entrySet()) {
+ final String key = entry.getKey();
+ final String value = entry.getValue();
+
+ if (value == null || value.length() <= maxAttributeLength) {
+ truncated.put(key, value);
+ continue;
+ }
+
+ truncated.put(key, value.substring(0, maxAttributeLength));
+ }
+
+ return truncated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
new file mode 100644
index 0000000..0582dd8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/EventRecordFields.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import static org.apache.nifi.repository.schema.Repetition.EXACTLY_ONE;
+import static org.apache.nifi.repository.schema.Repetition.ZERO_OR_MORE;
+import static org.apache.nifi.repository.schema.Repetition.ZERO_OR_ONE;
+
+import org.apache.nifi.repository.schema.ComplexRecordField;
+import org.apache.nifi.repository.schema.FieldType;
+import org.apache.nifi.repository.schema.MapRecordField;
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.SimpleRecordField;
+
+public class EventRecordFields {
+
+ public static class Names {
+ public static final String EVENT_IDENTIFIER = "Event ID";
+ public static final String EVENT_TYPE = "Event Type";
+ public static final String EVENT_TIME = "Event Time";
+ public static final String FLOWFILE_ENTRY_DATE = "FlowFile Entry Date";
+ public static final String EVENT_DURATION = "Event Duration";
+ public static final String LINEAGE_START_DATE = "Lineage Start Date";
+ public static final String COMPONENT_ID = "Component ID";
+ public static final String COMPONENT_TYPE = "Component Type";
+ public static final String FLOWFILE_UUID = "FlowFile UUID";
+ public static final String EVENT_DETAILS = "Event Details";
+ public static final String SOURCE_QUEUE_IDENTIFIER = "Source Queue Identifier";
+ public static final String CONTENT_CLAIM = "Content Claim";
+ public static final String PREVIOUS_CONTENT_CLAIM = "Previous Content Claim";
+ public static final String PARENT_UUIDS = "Parent UUIDs";
+ public static final String CHILD_UUIDS = "Child UUIDs";
+
+ public static final String ATTRIBUTE_NAME = "Attribute Name";
+ public static final String ATTRIBUTE_VALUE = "Attribute Value";
+ public static final String PREVIOUS_ATTRIBUTES = "Previous Attributes";
+ public static final String UPDATED_ATTRIBUTES = "Updated Attributes";
+
+ public static final String CONTENT_CLAIM_CONTAINER = "Content Claim Container";
+ public static final String CONTENT_CLAIM_SECTION = "Content Claim Section";
+ public static final String CONTENT_CLAIM_IDENTIFIER = "Content Claim Identifier";
+ public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset";
+ public static final String CONTENT_CLAIM_SIZE = "Content Claim Size";
+
+ public static final String TRANSIT_URI = "Transit URI";
+ public static final String SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = "Source System FlowFile Identifier";
+ public static final String ALTERNATE_IDENTIFIER = "Alternate Identifier";
+ public static final String RELATIONSHIP = "Relationship";
+ }
+
+ // General Event fields.
+ public static final RecordField RECORD_IDENTIFIER = new SimpleRecordField(Names.EVENT_IDENTIFIER, FieldType.LONG, EXACTLY_ONE);
+ public static final RecordField EVENT_TYPE = new SimpleRecordField(Names.EVENT_TYPE, FieldType.STRING, EXACTLY_ONE);
+ public static final RecordField EVENT_TIME = new SimpleRecordField(Names.EVENT_TIME, FieldType.LONG, EXACTLY_ONE);
+ public static final RecordField FLOWFILE_ENTRY_DATE = new SimpleRecordField(Names.FLOWFILE_ENTRY_DATE, FieldType.LONG, EXACTLY_ONE);
+ public static final RecordField EVENT_DURATION = new SimpleRecordField(Names.EVENT_DURATION, FieldType.LONG, EXACTLY_ONE);
+ public static final RecordField LINEAGE_START_DATE = new SimpleRecordField(Names.LINEAGE_START_DATE, FieldType.LONG, EXACTLY_ONE);
+ public static final RecordField COMPONENT_ID = new SimpleRecordField(Names.COMPONENT_ID, FieldType.STRING, ZERO_OR_ONE);
+ public static final RecordField COMPONENT_TYPE = new SimpleRecordField(Names.COMPONENT_TYPE, FieldType.STRING, ZERO_OR_ONE);
+ public static final RecordField FLOWFILE_UUID = new SimpleRecordField(Names.FLOWFILE_UUID, FieldType.STRING, EXACTLY_ONE);
+ public static final RecordField EVENT_DETAILS = new SimpleRecordField(Names.EVENT_DETAILS, FieldType.STRING, ZERO_OR_ONE);
+ public static final RecordField SOURCE_QUEUE_IDENTIFIER = new SimpleRecordField(Names.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
+
+ // Attributes
+ public static final RecordField ATTRIBUTE_NAME = new SimpleRecordField(Names.ATTRIBUTE_NAME, FieldType.LONG_STRING, EXACTLY_ONE);
+ public static final RecordField ATTRIBUTE_VALUE_REQUIRED = new SimpleRecordField(Names.ATTRIBUTE_VALUE, FieldType.LONG_STRING, EXACTLY_ONE);
+ public static final RecordField ATTRIBUTE_VALUE_OPTIONAL = new SimpleRecordField(Names.ATTRIBUTE_VALUE, FieldType.LONG_STRING, ZERO_OR_ONE);
+
+ public static final RecordField PREVIOUS_ATTRIBUTES = new MapRecordField(Names.PREVIOUS_ATTRIBUTES, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_REQUIRED, EXACTLY_ONE);
+ public static final RecordField UPDATED_ATTRIBUTES = new MapRecordField(Names.UPDATED_ATTRIBUTES, ATTRIBUTE_NAME, ATTRIBUTE_VALUE_OPTIONAL, EXACTLY_ONE);
+
+ // Content Claims
+ public static final RecordField CONTENT_CLAIM_CONTAINER = new SimpleRecordField(Names.CONTENT_CLAIM_CONTAINER, FieldType.STRING, EXACTLY_ONE);
+ public static final RecordField CONTENT_CLAIM_SECTION = new SimpleRecordField(Names.CONTENT_CLAIM_SECTION, FieldType.STRING, EXACTLY_ONE);
+ public static final RecordField CONTENT_CLAIM_IDENTIFIER = new SimpleRecordField(Names.CONTENT_CLAIM_IDENTIFIER, FieldType.STRING, EXACTLY_ONE);
+ public static final RecordField CONTENT_CLAIM_OFFSET = new SimpleRecordField(Names.CONTENT_CLAIM_OFFSET, FieldType.LONG, EXACTLY_ONE);
+ public static final RecordField CONTENT_CLAIM_SIZE = new SimpleRecordField(Names.CONTENT_CLAIM_SIZE, FieldType.LONG, EXACTLY_ONE);
+ public static final RecordField CURRENT_CONTENT_CLAIM = new ComplexRecordField(Names.CONTENT_CLAIM, ZERO_OR_ONE,
+ CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+ public static final RecordField PREVIOUS_CONTENT_CLAIM = new ComplexRecordField(Names.PREVIOUS_CONTENT_CLAIM, ZERO_OR_ONE,
+ CONTENT_CLAIM_CONTAINER, CONTENT_CLAIM_SECTION, CONTENT_CLAIM_IDENTIFIER, CONTENT_CLAIM_OFFSET, CONTENT_CLAIM_SIZE);
+
+ // EventType-Specific fields
+ // for FORK, JOIN, CLONE, REPLAY
+ public static final RecordField PARENT_UUIDS = new SimpleRecordField(Names.PARENT_UUIDS, FieldType.STRING, ZERO_OR_MORE);
+ public static final RecordField CHILD_UUIDS = new SimpleRecordField(Names.CHILD_UUIDS, FieldType.STRING, ZERO_OR_MORE);
+
+ // for SEND/RECEIVE/FETCH
+ public static final RecordField TRANSIT_URI = new SimpleRecordField(Names.TRANSIT_URI, FieldType.STRING, ZERO_OR_ONE);
+ public static final RecordField SOURCE_SYSTEM_FLOWFILE_IDENTIFIER = new SimpleRecordField(Names.SOURCE_QUEUE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
+
+ // for ADD_INFO
+ public static final RecordField ALTERNATE_IDENTIFIER = new SimpleRecordField(Names.ALTERNATE_IDENTIFIER, FieldType.STRING, ZERO_OR_ONE);
+ public static final RecordField RELATIONSHIP = new SimpleRecordField(Names.RELATIONSHIP, FieldType.STRING, ZERO_OR_ONE);
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
new file mode 100644
index 0000000..d70bd39
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/schema/ProvenanceEventSchema.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.schema;
+
+import static org.apache.nifi.provenance.schema.EventRecordFields.ALTERNATE_IDENTIFIER;
+import static org.apache.nifi.provenance.schema.EventRecordFields.CHILD_UUIDS;
+import static org.apache.nifi.provenance.schema.EventRecordFields.COMPONENT_ID;
+import static org.apache.nifi.provenance.schema.EventRecordFields.COMPONENT_TYPE;
+import static org.apache.nifi.provenance.schema.EventRecordFields.CURRENT_CONTENT_CLAIM;
+import static org.apache.nifi.provenance.schema.EventRecordFields.EVENT_DETAILS;
+import static org.apache.nifi.provenance.schema.EventRecordFields.EVENT_DURATION;
+import static org.apache.nifi.provenance.schema.EventRecordFields.EVENT_TIME;
+import static org.apache.nifi.provenance.schema.EventRecordFields.EVENT_TYPE;
+import static org.apache.nifi.provenance.schema.EventRecordFields.FLOWFILE_ENTRY_DATE;
+import static org.apache.nifi.provenance.schema.EventRecordFields.FLOWFILE_UUID;
+import static org.apache.nifi.provenance.schema.EventRecordFields.LINEAGE_START_DATE;
+import static org.apache.nifi.provenance.schema.EventRecordFields.PARENT_UUIDS;
+import static org.apache.nifi.provenance.schema.EventRecordFields.PREVIOUS_ATTRIBUTES;
+import static org.apache.nifi.provenance.schema.EventRecordFields.PREVIOUS_CONTENT_CLAIM;
+import static org.apache.nifi.provenance.schema.EventRecordFields.RECORD_IDENTIFIER;
+import static org.apache.nifi.provenance.schema.EventRecordFields.RELATIONSHIP;
+import static org.apache.nifi.provenance.schema.EventRecordFields.SOURCE_QUEUE_IDENTIFIER;
+import static org.apache.nifi.provenance.schema.EventRecordFields.SOURCE_SYSTEM_FLOWFILE_IDENTIFIER;
+import static org.apache.nifi.provenance.schema.EventRecordFields.TRANSIT_URI;
+import static org.apache.nifi.provenance.schema.EventRecordFields.UPDATED_ATTRIBUTES;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.repository.schema.RecordField;
+import org.apache.nifi.repository.schema.RecordSchema;
+
+public class ProvenanceEventSchema {
+ public static final RecordSchema PROVENANCE_EVENT_SCHEMA_V1 = buildSchemaV1();
+
+ private static RecordSchema buildSchemaV1() {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(RECORD_IDENTIFIER);
+ fields.add(EVENT_TYPE);
+ fields.add(EVENT_TIME);
+ fields.add(FLOWFILE_ENTRY_DATE);
+ fields.add(EVENT_DURATION);
+ fields.add(LINEAGE_START_DATE);
+ fields.add(COMPONENT_ID);
+ fields.add(COMPONENT_TYPE);
+ fields.add(FLOWFILE_UUID);
+ fields.add(EVENT_DETAILS);
+ fields.add(PREVIOUS_ATTRIBUTES);
+ fields.add(UPDATED_ATTRIBUTES);
+ fields.add(CURRENT_CONTENT_CLAIM);
+ fields.add(PREVIOUS_CONTENT_CLAIM);
+ fields.add(SOURCE_QUEUE_IDENTIFIER);
+
+ // EventType-Specific fields
+ fields.add(PARENT_UUIDS); // for FORK, JOIN, CLONE, REPLAY events
+ fields.add(CHILD_UUIDS); // for FORK, JOIN, CLONE, REPLAY events
+ fields.add(TRANSIT_URI); // for SEND/RECEIVE/FETCH events
+ fields.add(SOURCE_SYSTEM_FLOWFILE_IDENTIFIER); // for SEND/RECEIVE events
+ fields.add(ALTERNATE_IDENTIFIER); // for ADD_INFO events
+ fields.add(RELATIONSHIP); // for ROUTE events
+
+ final RecordSchema schema = new RecordSchema(fields);
+ return schema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
new file mode 100644
index 0000000..056829a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardRecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CompressableRecordReader implements RecordReader {
+ private static final Logger logger = LoggerFactory.getLogger(StandardRecordReader.class);
+
+ private final ByteCountingInputStream rawInputStream;
+ private final String filename;
+ private final int serializationVersion;
+ private final boolean compressed;
+ private final TocReader tocReader;
+ private final int headerLength;
+ private final int maxAttributeChars;
+
+ private DataInputStream dis;
+ private ByteCountingInputStream byteCountingIn;
+
+ public CompressableRecordReader(final InputStream in, final String filename, final int maxAttributeChars) throws IOException {
+ this(in, filename, null, maxAttributeChars);
+ }
+
+ public CompressableRecordReader(final InputStream in, final String filename, final TocReader tocReader, final int maxAttributeChars) throws IOException {
+ logger.trace("Creating RecordReader for {}", filename);
+
+ rawInputStream = new ByteCountingInputStream(in);
+ this.maxAttributeChars = maxAttributeChars;
+
+ final InputStream limitedStream;
+ if (tocReader == null) {
+ limitedStream = rawInputStream;
+ } else {
+ final long offset1 = tocReader.getBlockOffset(1);
+ if (offset1 < 0) {
+ limitedStream = rawInputStream;
+ } else {
+ limitedStream = new LimitingInputStream(rawInputStream, offset1 - rawInputStream.getBytesConsumed());
+ }
+ }
+
+ final InputStream readableStream;
+ if (filename.endsWith(".gz")) {
+ readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
+ compressed = true;
+ } else {
+ readableStream = new BufferedInputStream(limitedStream);
+ compressed = false;
+ }
+
+ byteCountingIn = new ByteCountingInputStream(readableStream);
+ dis = new DataInputStream(byteCountingIn);
+
+ final String repoClassName = dis.readUTF();
+ final int serializationVersion = dis.readInt();
+ headerLength = repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for string length, 4 for integer.
+
+ this.serializationVersion = serializationVersion;
+ this.filename = filename;
+ this.tocReader = tocReader;
+
+ readHeader(dis, serializationVersion);
+ }
+
+ @Override
+ public void skipToBlock(final int blockIndex) throws IOException {
+ if (tocReader == null) {
+ throw new IllegalStateException("Cannot skip to block " + blockIndex + " for Provenance Log " + filename + " because no Table-of-Contents file was found for this Log");
+ }
+
+ if (blockIndex < 0) {
+ throw new IllegalArgumentException("Cannot skip to block " + blockIndex + " because the value is negative");
+ }
+
+ if (blockIndex == getBlockIndex()) {
+ return;
+ }
+
+ final long offset = tocReader.getBlockOffset(blockIndex);
+ if (offset < 0) {
+ throw new IOException("Unable to find block " + blockIndex + " in Provenance Log " + filename);
+ }
+
+ final long curOffset = rawInputStream.getBytesConsumed();
+
+ final long bytesToSkip = offset - curOffset;
+ if (bytesToSkip >= 0) {
+ try {
+ StreamUtils.skip(rawInputStream, bytesToSkip);
+ logger.debug("Skipped stream from offset {} to {} ({} bytes skipped)", curOffset, offset, bytesToSkip);
+ } catch (final IOException e) {
+ throw new IOException("Failed to skip to offset " + offset + " for block " + blockIndex + " of Provenance Log " + filename, e);
+ }
+
+ resetStreamForNextBlock();
+ }
+ }
+
+ private void resetStreamForNextBlock() throws IOException {
+ final InputStream limitedStream;
+ if (tocReader == null) {
+ limitedStream = rawInputStream;
+ } else {
+ final long offset = tocReader.getBlockOffset(1 + getBlockIndex());
+ if (offset < 0) {
+ limitedStream = rawInputStream;
+ } else {
+ limitedStream = new LimitingInputStream(rawInputStream, offset - rawInputStream.getBytesConsumed());
+ }
+ }
+
+ final InputStream readableStream;
+ if (compressed) {
+ readableStream = new BufferedInputStream(new GZIPInputStream(limitedStream));
+ } else {
+ readableStream = new BufferedInputStream(limitedStream);
+ }
+
+ byteCountingIn = new ByteCountingInputStream(readableStream, rawInputStream.getBytesConsumed());
+ dis = new DataInputStream(byteCountingIn);
+ }
+
+
+ @Override
+ public TocReader getTocReader() {
+ return tocReader;
+ }
+
+ @Override
+ public boolean isBlockIndexAvailable() {
+ return tocReader != null;
+ }
+
+ @Override
+ public int getBlockIndex() {
+ if (tocReader == null) {
+ throw new IllegalStateException("Cannot determine Block Index because no Table-of-Contents could be found for Provenance Log " + filename);
+ }
+
+ return tocReader.getBlockIndex(rawInputStream.getBytesConsumed());
+ }
+
+ @Override
+ public long getBytesConsumed() {
+ return byteCountingIn.getBytesConsumed();
+ }
+
+ private boolean isData() throws IOException {
+ byteCountingIn.mark(1);
+ int nextByte = byteCountingIn.read();
+ byteCountingIn.reset();
+
+ if (nextByte < 0) {
+ try {
+ resetStreamForNextBlock();
+ } catch (final EOFException eof) {
+ return false;
+ }
+
+ byteCountingIn.mark(1);
+ nextByte = byteCountingIn.read();
+ byteCountingIn.reset();
+ }
+
+ return nextByte >= 0;
+ }
+
+ @Override
+ public long getMaxEventId() throws IOException {
+ if (tocReader != null) {
+ final long lastBlockOffset = tocReader.getLastBlockOffset();
+ skipToBlock(tocReader.getBlockIndex(lastBlockOffset));
+ }
+
+ ProvenanceEventRecord record;
+ ProvenanceEventRecord lastRecord = null;
+ try {
+ while ((record = nextRecord()) != null) {
+ lastRecord = record;
+ }
+ } catch (final EOFException eof) {
+ // This can happen if we stop NIFi while the record is being written.
+ // This is OK, we just ignore this record. The session will not have been
+ // committed, so we can just process the FlowFile again.
+ }
+
+ return lastRecord == null ? -1L : lastRecord.getEventId();
+ }
+
+ @Override
+ public void close() throws IOException {
+ logger.trace("Closing Record Reader for {}", filename);
+
+ try {
+ dis.close();
+ } finally {
+ try {
+ rawInputStream.close();
+ } finally {
+ if (tocReader != null) {
+ tocReader.close();
+ }
+ }
+ }
+ }
+
+ @Override
+ public void skip(final long bytesToSkip) throws IOException {
+ StreamUtils.skip(dis, bytesToSkip);
+ }
+
+ @Override
+ public void skipTo(final long position) throws IOException {
+ // we are subtracting headerLength from the number of bytes consumed because we used to
+ // consider the offset of the first record "0" - now we consider it whatever position it
+ // it really is in the stream.
+ final long currentPosition = byteCountingIn.getBytesConsumed() - headerLength;
+ if (currentPosition == position) {
+ return;
+ }
+ if (currentPosition > position) {
+ throw new IOException("Cannot skip to byte offset " + position + " in stream because already at byte offset " + currentPosition);
+ }
+
+ final long toSkip = position - currentPosition;
+ StreamUtils.skip(dis, toSkip);
+ }
+
+ protected String getFilename() {
+ return filename;
+ }
+
+ protected int getMaxAttributeLength() {
+ return maxAttributeChars;
+ }
+
+ @Override
+ public StandardProvenanceEventRecord nextRecord() throws IOException {
+ if (isData()) {
+ return nextRecord(dis, serializationVersion);
+ } else {
+ return null;
+ }
+ }
+
+ protected abstract StandardProvenanceEventRecord nextRecord(DataInputStream in, int serializationVersion) throws IOException;
+
+ protected void readHeader(DataInputStream in, int serializationVersion) throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
new file mode 100644
index 0000000..fa0e390
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordWriter.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.nifi.provenance.AbstractRecordWriter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocWriter;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CompressableRecordWriter extends AbstractRecordWriter {
+ private static final Logger logger = LoggerFactory.getLogger(CompressableRecordWriter.class);
+
+ private final FileOutputStream fos;
+ private final ByteCountingOutputStream rawOutStream;
+ private final boolean compressed;
+ private final int uncompressedBlockSize;
+
+ private DataOutputStream out;
+ private ByteCountingOutputStream byteCountingOut;
+ private long lastBlockOffset = 0L;
+ private int recordCount = 0;
+
+
+ public CompressableRecordWriter(final File file, final TocWriter writer, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+ super(file, writer);
+ logger.trace("Creating Record Writer for {}", file.getName());
+
+ this.compressed = compressed;
+ this.fos = new FileOutputStream(file);
+ rawOutStream = new ByteCountingOutputStream(fos);
+ this.uncompressedBlockSize = uncompressedBlockSize;
+ }
+
+ public CompressableRecordWriter(final OutputStream out, final TocWriter tocWriter, final boolean compressed, final int uncompressedBlockSize) throws IOException {
+ super(null, tocWriter);
+ this.fos = null;
+
+ this.compressed = compressed;
+ this.uncompressedBlockSize = uncompressedBlockSize;
+ this.rawOutStream = new ByteCountingOutputStream(out);
+ }
+
+
+ @Override
+ public synchronized void writeHeader(final long firstEventId) throws IOException {
+ if (isDirty()) {
+ throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
+ }
+
+ try {
+ lastBlockOffset = rawOutStream.getBytesWritten();
+ resetWriteStream(firstEventId);
+ out.writeUTF(getSerializationName());
+ out.writeInt(getSerializationVersion());
+ writeHeader(firstEventId, out);
+ out.flush();
+ lastBlockOffset = rawOutStream.getBytesWritten();
+ } catch (final IOException ioe) {
+ markDirty();
+ throw ioe;
+ }
+ }
+
+
+
+ /**
+ * Resets the streams to prepare for a new block
+ *
+ * @param eventId the first id that will be written to the new block
+ * @throws IOException if unable to flush/close the current streams properly
+ */
+ private void resetWriteStream(final long eventId) throws IOException {
+ try {
+ if (out != null) {
+ out.flush();
+ }
+
+ final long byteOffset = (byteCountingOut == null) ? rawOutStream.getBytesWritten() : byteCountingOut.getBytesWritten();
+ final TocWriter tocWriter = getTocWriter();
+
+ final OutputStream writableStream;
+ if (compressed) {
+ // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+ // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+ // the underlying OutputStream in a NonCloseableOutputStream
+ // We don't have to check if the writer is dirty because we will have already checked before calling this method.
+ if (out != null) {
+ out.close();
+ }
+
+ if (tocWriter != null) {
+ tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
+ }
+
+ writableStream = new BufferedOutputStream(new GZIPOutputStream(new NonCloseableOutputStream(rawOutStream), 1), 65536);
+ } else {
+ if (tocWriter != null) {
+ tocWriter.addBlockOffset(rawOutStream.getBytesWritten(), eventId);
+ }
+
+ writableStream = new BufferedOutputStream(rawOutStream, 65536);
+ }
+
+ this.byteCountingOut = new ByteCountingOutputStream(writableStream, byteOffset);
+ this.out = new DataOutputStream(byteCountingOut);
+ resetDirtyFlag();
+ } catch (final IOException ioe) {
+ markDirty();
+ throw ioe;
+ }
+ }
+
+
+
+ @Override
+ public long writeRecord(final ProvenanceEventRecord record, final long recordIdentifier) throws IOException {
+ if (isDirty()) {
+ throw new IOException("Cannot update Provenance Repository because this Record Writer has already failed to write to the Repository");
+ }
+
+ try {
+ final long startBytes = byteCountingOut.getBytesWritten();
+
+ // add a new block to the TOC if needed.
+ if (getTocWriter() != null && (startBytes - lastBlockOffset >= uncompressedBlockSize)) {
+ lastBlockOffset = startBytes;
+
+ if (compressed) {
+ // because of the way that GZIPOutputStream works, we need to call close() on it in order for it
+ // to write its trailing bytes. But we don't want to close the underlying OutputStream, so we wrap
+ // the underlying OutputStream in a NonCloseableOutputStream
+ resetWriteStream(recordIdentifier);
+ }
+ }
+
+ writeRecord(record, recordIdentifier, out);
+
+ recordCount++;
+ return byteCountingOut.getBytesWritten() - startBytes;
+ } catch (final IOException ioe) {
+ markDirty();
+ throw ioe;
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ @Override
+ public synchronized int getRecordsWritten() {
+ return recordCount;
+ }
+
+ @Override
+ protected OutputStream getBufferedOutputStream() {
+ return out;
+ }
+
+ @Override
+ protected OutputStream getUnderlyingOutputStream() {
+ return fos;
+ }
+
+ @Override
+ protected void syncUnderlyingOutputStream() throws IOException {
+ if (fos != null) {
+ fos.getFD().sync();
+ }
+ }
+
+ protected abstract void writeRecord(final ProvenanceEventRecord event, final long eventId, final DataOutputStream out) throws IOException;
+
+ protected abstract void writeHeader(final long firstEventId, final DataOutputStream out) throws IOException;
+
+ protected abstract int getSerializationVersion();
+
+ protected abstract String getSerializationName();
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
new file mode 100644
index 0000000..38a4cc9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/EmptyRecordReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.io.IOException;
+
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.toc.TocReader;
+
+public class EmptyRecordReader implements RecordReader {
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public StandardProvenanceEventRecord nextRecord() throws IOException {
+ return null;
+ }
+
+ @Override
+ public void skip(long bytesToSkip) throws IOException {
+ }
+
+ @Override
+ public void skipTo(long position) throws IOException {
+ }
+
+ @Override
+ public void skipToBlock(int blockIndex) throws IOException {
+ }
+
+ @Override
+ public int getBlockIndex() {
+ return 0;
+ }
+
+ @Override
+ public boolean isBlockIndexAvailable() {
+ return false;
+ }
+
+ @Override
+ public TocReader getTocReader() {
+ return null;
+ }
+
+ @Override
+ public long getBytesConsumed() {
+ return 0;
+ }
+
+ @Override
+ public long getMaxEventId() throws IOException {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
index 7889cd6..24efcbd 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordReaders.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.provenance.serialization;
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -23,8 +26,12 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collection;
+import java.util.zip.GZIPInputStream;
+import org.apache.nifi.provenance.ByteArraySchemaRecordReader;
+import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
import org.apache.nifi.provenance.StandardRecordReader;
+import org.apache.nifi.provenance.StandardRecordWriter;
import org.apache.nifi.provenance.lucene.LuceneUtil;
import org.apache.nifi.provenance.toc.StandardTocReader;
import org.apache.nifi.provenance.toc.TocReader;
@@ -101,11 +108,39 @@ public class RecordReaders {
}
final File tocFile = TocUtil.getTocFile(file);
- if ( tocFile.exists() ) {
- final TocReader tocReader = new StandardTocReader(tocFile);
- return new StandardRecordReader(fis, filename, tocReader, maxAttributeChars);
- } else {
- return new StandardRecordReader(fis, filename, maxAttributeChars);
+
+ final InputStream bufferedInStream = new BufferedInputStream(fis);
+ final String serializationName;
+ try {
+ bufferedInStream.mark(4096);
+ final InputStream in = filename.endsWith(".gz") ? new GZIPInputStream(bufferedInStream) : bufferedInStream;
+ final DataInputStream dis = new DataInputStream(in);
+ serializationName = dis.readUTF();
+ bufferedInStream.reset();
+ } catch (final EOFException eof) {
+ return new EmptyRecordReader();
+ }
+
+ switch (serializationName) {
+ case StandardRecordWriter.SERIALIZATION_NAME: {
+ if (tocFile.exists()) {
+ final TocReader tocReader = new StandardTocReader(tocFile);
+ return new StandardRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
+ } else {
+ return new StandardRecordReader(bufferedInStream, filename, maxAttributeChars);
+ }
+ }
+ case ByteArraySchemaRecordWriter.SERIALIZATION_NAME: {
+ if (tocFile.exists()) {
+ final TocReader tocReader = new StandardTocReader(tocFile);
+ return new ByteArraySchemaRecordReader(bufferedInStream, filename, tocReader, maxAttributeChars);
+ } else {
+ return new ByteArraySchemaRecordReader(bufferedInStream, filename, maxAttributeChars);
+ }
+ }
+ default: {
+ throw new IOException("Unable to read data from file " + file + " because the file was written using an unknown Serializer: " + serializationName);
+ }
}
} catch (final IOException ioe) {
if ( fis != null ) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
index b157ccc..17dd75c 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriter.java
@@ -44,6 +44,13 @@ public interface RecordWriter extends Closeable {
long writeRecord(ProvenanceEventRecord record, long recordIdentifier) throws IOException;
/**
+ * Flushes any data that is held in a buffer to the underlying storage mechanism
+ *
+ * @throws IOException if unable to flush the bytes
+ */
+ void flush() throws IOException;
+
+ /**
* @return the number of Records that have been written to this RecordWriter
*/
int getRecordsWritten();
http://git-wip-us.apache.org/repos/asf/nifi/blob/1be08714/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
index cf8f7b4..be4c9cf 100644
--- a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
+++ b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/RecordWriters.java
@@ -19,7 +19,7 @@ package org.apache.nifi.provenance.serialization;
import java.io.File;
import java.io.IOException;
-import org.apache.nifi.provenance.StandardRecordWriter;
+import org.apache.nifi.provenance.ByteArraySchemaRecordWriter;
import org.apache.nifi.provenance.toc.StandardTocWriter;
import org.apache.nifi.provenance.toc.TocUtil;
import org.apache.nifi.provenance.toc.TocWriter;
@@ -27,13 +27,13 @@ import org.apache.nifi.provenance.toc.TocWriter;
public class RecordWriters {
private static final int DEFAULT_COMPRESSION_BLOCK_SIZE = 1024 * 1024; // 1 MB
- public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
- return newRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
+ public static RecordWriter newSchemaRecordWriter(final File file, final boolean compressed, final boolean createToc) throws IOException {
+ return newSchemaRecordWriter(file, compressed, createToc, DEFAULT_COMPRESSION_BLOCK_SIZE);
}
- public static RecordWriter newRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
+ public static RecordWriter newSchemaRecordWriter(final File file, final boolean compressed, final boolean createToc, final int compressionBlockBytes) throws IOException {
final TocWriter tocWriter = createToc ? new StandardTocWriter(TocUtil.getTocFile(file), false, false) : null;
- return new StandardRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
+ return new ByteArraySchemaRecordWriter(file, tocWriter, compressed, compressionBlockBytes);
}
}