You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/27 21:47:33 UTC
[06/12] incubator-nifi git commit: bug fixes and additional pieces of
repo implemented
bug fixes and additional pieces of repo implemented
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b95e7569
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b95e7569
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b95e7569
Branch: refs/heads/journaling-prov-repo
Commit: b95e7569f75c7beb2fd214d3304ae9630f4a8545
Parents: a68bef6
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Feb 12 19:19:37 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Feb 12 19:19:37 2015 -0500
----------------------------------------------------------------------
.../apache/nifi/provenance/EventIdLocation.java | 31 +++
.../provenance/IdEnrichedProvenanceEvent.java | 175 ++++++++++++++++
.../nifi/provenance/StandardLineageResult.java | 2 +-
.../org/apache/nifi/util/file/FileUtils.java | 1 -
.../MockProvenanceEventRepository.java | 41 +++-
.../manager/impl/ClusteredEventAccess.java | 32 ++-
.../repository/StandardProcessSession.java | 164 ++++++---------
.../nifi/controller/tasks/ExpireFlowFiles.java | 27 ++-
.../repository/TestStandardProcessSession.java | 13 +-
.../nifi/web/controller/ControllerFacade.java | 6 +-
.../pom.xml | 5 +
.../JournalingProvenanceRepository.java | 31 +--
.../config/JournalingRepositoryConfig.java | 2 +-
.../journals/StandardJournalReader.java | 2 +-
.../partition/JournalingPartition.java | 6 +
.../partition/QueuingPartitionManager.java | 39 +++-
.../journaling/query/QueryManager.java | 42 ++++
.../journaling/query/StandardQueryManager.java | 144 ++++++++++++++
...he.nifi.provenance.ProvenanceEventRepository | 15 ++
.../TestJournalingProvenanceRepository.java | 144 ++++++++++++++
.../nifi/provenance/journaling/TestUtil.java | 8 +
.../PersistentProvenanceRepository.java | 80 ++++++--
.../nifi/provenance/lucene/DocsReader.java | 21 +-
.../nifi/provenance/lucene/IndexSearch.java | 8 +-
.../nifi/provenance/lucene/LineageQuery.java | 11 +-
.../TestPersistentProvenanceRepository.java | 44 ++---
.../nifi-provenance-repository-nar/pom.xml | 4 +
.../VolatileProvenanceRepository.java | 198 +------------------
.../nifi-provenance-repository-bundle/pom.xml | 6 +
29 files changed, 909 insertions(+), 393 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java
new file mode 100644
index 0000000..9cc6c4d
--- /dev/null
+++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/EventIdLocation.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.nifi.provenance.StorageLocation;
+
+public class EventIdLocation implements StorageLocation {
+ private final long id;
+
+ public EventIdLocation(final long id) {
+ this.id = id;
+ }
+
+ public long getId() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java
new file mode 100644
index 0000000..4ef0e5d
--- /dev/null
+++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/IdEnrichedProvenanceEvent.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StorageLocation;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+
+public class IdEnrichedProvenanceEvent implements StoredProvenanceEvent {
+
+ private final ProvenanceEventRecord event;
+ private final long id;
+
+ public IdEnrichedProvenanceEvent(final ProvenanceEventRecord event) {
+ this(event, event.getEventId());
+ }
+
+ public IdEnrichedProvenanceEvent(final ProvenanceEventRecord event, final long id) {
+ this.event = event;
+ this.id = id;
+ }
+
+ @Override
+ public StorageLocation getStorageLocation() {
+ return new EventIdLocation(id);
+ }
+
+ public long getEventId() {
+ return id;
+ }
+
+ public long getEventTime() {
+ return event.getEventTime();
+ }
+
+ public long getFlowFileEntryDate() {
+ return event.getFlowFileEntryDate();
+ }
+
+ public long getLineageStartDate() {
+ return event.getLineageStartDate();
+ }
+
+ public Set<String> getLineageIdentifiers() {
+ return event.getLineageIdentifiers();
+ }
+
+ public long getFileSize() {
+ return event.getFileSize();
+ }
+
+ public Long getPreviousFileSize() {
+ return event.getPreviousFileSize();
+ }
+
+ public long getEventDuration() {
+ return event.getEventDuration();
+ }
+
+ public ProvenanceEventType getEventType() {
+ return event.getEventType();
+ }
+
+ public Map<String, String> getAttributes() {
+ return event.getAttributes();
+ }
+
+ public Map<String, String> getPreviousAttributes() {
+ return event.getPreviousAttributes();
+ }
+
+ public Map<String, String> getUpdatedAttributes() {
+ return event.getUpdatedAttributes();
+ }
+
+ public String getComponentId() {
+ return event.getComponentId();
+ }
+
+ public String getComponentType() {
+ return event.getComponentType();
+ }
+
+ public String getTransitUri() {
+ return event.getTransitUri();
+ }
+
+ public String getSourceSystemFlowFileIdentifier() {
+ return event.getSourceSystemFlowFileIdentifier();
+ }
+
+ public String getFlowFileUuid() {
+ return event.getFlowFileUuid();
+ }
+
+ public List<String> getParentUuids() {
+ return event.getParentUuids();
+ }
+
+ public List<String> getChildUuids() {
+ return event.getChildUuids();
+ }
+
+ public String getAlternateIdentifierUri() {
+ return event.getAlternateIdentifierUri();
+ }
+
+ public String getDetails() {
+ return event.getDetails();
+ }
+
+ public String getRelationship() {
+ return event.getRelationship();
+ }
+
+ public String getSourceQueueIdentifier() {
+ return event.getSourceQueueIdentifier();
+ }
+
+ public String getContentClaimSection() {
+ return event.getContentClaimSection();
+ }
+
+ public String getPreviousContentClaimSection() {
+ return event.getPreviousContentClaimSection();
+ }
+
+ public String getContentClaimContainer() {
+ return event.getContentClaimContainer();
+ }
+
+ public String getPreviousContentClaimContainer() {
+ return event.getPreviousContentClaimContainer();
+ }
+
+ public String getContentClaimIdentifier() {
+ return event.getContentClaimIdentifier();
+ }
+
+ public String getPreviousContentClaimIdentifier() {
+ return event.getPreviousContentClaimIdentifier();
+ }
+
+ public Long getContentClaimOffset() {
+ return event.getContentClaimOffset();
+ }
+
+ public Long getPreviousContentClaimOffset() {
+ return event.getPreviousContentClaimOffset();
+ }
+
+ @Override
+ public String toString() {
+ return event.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
index afb56e8..0f454bd 100644
--- a/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
+++ b/nifi/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardLineageResult.java
@@ -178,7 +178,7 @@ public class StandardLineageResult implements ComputeLineageResult {
}
}
- public void update(final Collection<ProvenanceEventRecord> records) {
+ public void update(final Collection<StoredProvenanceEvent> records) {
writeLock.lock();
try {
relevantRecords.addAll(records);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
index 41a0557..71dbc79 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.util.file;
-import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java b/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java
index 241041a..c4caa71 100644
--- a/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java
+++ b/nifi/nifi-mock/src/main/java/org/apache/nifi/provenance/MockProvenanceEventRepository.java
@@ -18,6 +18,7 @@ package org.apache.nifi.provenance;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -30,11 +31,11 @@ import org.apache.nifi.provenance.search.SearchableField;
public class MockProvenanceEventRepository implements ProvenanceEventRepository {
- private final List<ProvenanceEventRecord> records = new ArrayList<>();
+ private final List<StoredProvenanceEvent> records = new ArrayList<>();
private final AtomicLong idGenerator = new AtomicLong(0L);
@Override
- public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
+ public void registerEvents(final Collection<ProvenanceEventRecord> events) {
for (final ProvenanceEventRecord event : events) {
registerEvent(event);
}
@@ -50,7 +51,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository
}
newRecord.setEventId(idGenerator.getAndIncrement());
- records.add(newRecord);
+ records.add(new IdEnrichedProvenanceEvent(newRecord));
}
@Override
@@ -58,7 +59,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository
}
@Override
- public List<ProvenanceEventRecord> getEvents(long firstRecordId, int maxRecords) throws IOException {
+ public List<StoredProvenanceEvent> getEvents(long firstRecordId, int maxRecords) throws IOException {
if (firstRecordId > records.size()) {
return Collections.emptyList();
}
@@ -92,7 +93,7 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository
}
@Override
- public ProvenanceEventRecord getEvent(long id) throws IOException {
+ public StoredProvenanceEvent getEvent(long id) throws IOException {
if (id > records.size()) {
return null;
}
@@ -128,4 +129,34 @@ public class MockProvenanceEventRepository implements ProvenanceEventRepository
public ProvenanceEventBuilder eventBuilder() {
return new StandardProvenanceEventRecord.Builder();
}
+
+ @Override
+ public Long getEarliestEventTime() throws IOException {
+ final StoredProvenanceEvent event = getEvent(0);
+ if ( event == null ) {
+ return null;
+ }
+
+ return event.getEventTime();
+ }
+
+ @Override
+ public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException {
+ if ( location instanceof EventIdLocation ) {
+ return getEvent( ((EventIdLocation) location).getId() );
+ }
+ throw new IllegalArgumentException("Invalid StorageLocation");
+ }
+
+ @Override
+ public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException {
+ final List<StoredProvenanceEvent> events = new ArrayList<>(storageLocations.size());
+ for ( final StorageLocation location : storageLocations ) {
+ final StoredProvenanceEvent event = getEvent(location);
+ if ( event != null ) {
+ events.add(event);
+ }
+ }
+ return events;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
index 2015530..7780d04 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredEventAccess.java
@@ -18,6 +18,8 @@ package org.apache.nifi.cluster.manager.impl;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import org.apache.nifi.controller.status.ProcessGroupStatus;
@@ -25,6 +27,8 @@ import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.StorageLocation;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QuerySubmission;
@@ -59,12 +63,12 @@ public class ClusteredEventAccess implements EventAccess {
}
@Override
- public ProvenanceEventRecord getEvent(long eventId) throws IOException {
+ public StoredProvenanceEvent getEvent(long eventId) throws IOException {
return null;
}
@Override
- public List<ProvenanceEventRecord> getEvents(long startEventId, int maxEvents) throws IOException {
+ public List<StoredProvenanceEvent> getEvents(long startEventId, int maxEvents) throws IOException {
return new ArrayList<>();
}
@@ -88,10 +92,6 @@ public class ClusteredEventAccess implements EventAccess {
}
@Override
- public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
- }
-
- @Override
public ComputeLineageSubmission retrieveLineageSubmission(final String submissionId) {
return null;
}
@@ -130,6 +130,26 @@ public class ClusteredEventAccess implements EventAccess {
public void initialize(EventReporter eventReporter) throws IOException {
}
+
+ @Override
+ public Long getEarliestEventTime() throws IOException {
+ return null;
+ }
+
+ @Override
+ public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException {
+ return null;
+ }
+
+ @Override
+ public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void registerEvents(final Collection<ProvenanceEventRecord> events) throws IOException {
+
+ }
};
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index dcb461c..899fccc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -28,11 +28,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -53,9 +51,6 @@ import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.repository.io.LongHolder;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.stream.io.BufferedOutputStream;
-import org.apache.nifi.stream.io.NonCloseableInputStream;
-import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.processor.ProcessSession;
@@ -74,6 +69,9 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceReporter;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.stream.io.BufferedOutputStream;
+import org.apache.nifi.stream.io.NonCloseableInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.NiFiProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -299,7 +297,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
resetReadClaim();
final long updateProvenanceStart = System.nanoTime();
- updateProvenanceRepo(checkpoint);
+ try {
+ updateProvenanceRepo(checkpoint);
+ } catch (final IOException ioe) {
+ rollback();
+ throw new ProcessException("Provenance Repository failed to update", ioe);
+ }
final long claimRemovalStart = System.nanoTime();
final long updateProvenanceNanos = claimRemovalStart - updateProvenanceStart;
@@ -497,7 +500,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
eventTypes.add(eventType);
}
- private void updateProvenanceRepo(final Checkpoint checkpoint) {
+ private void updateProvenanceRepo(final Checkpoint checkpoint) throws IOException {
// Update Provenance Repository
final ProvenanceEventRepository provenanceRepo = context.getProvenanceRepository();
@@ -641,46 +644,16 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents;
- final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
- final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator();
- final Iterator<ProvenanceEventRecord> autoTermIterator = autoTermEvents == null ? null : autoTermEvents.iterator();
-
- @Override
- public Iterator<ProvenanceEventRecord> iterator() {
- return new Iterator<ProvenanceEventRecord>() {
- @Override
- public boolean hasNext() {
- return recordsToSubmitIterator.hasNext() || (autoTermIterator != null && autoTermIterator.hasNext());
- }
-
- @Override
- public ProvenanceEventRecord next() {
- if (recordsToSubmitIterator.hasNext()) {
- final ProvenanceEventRecord rawEvent = recordsToSubmitIterator.next();
-
- // Update the Provenance Event Record with all of the info that we know about the event.
- // For SEND events, we do not want to update the FlowFile info on the Event, because the event should
- // reflect the FlowFile as it was sent to the remote system. However, for other events, we want to use
- // the representation of the FlowFile as it is committed, as this is the only way in which it really
- // exists in our system -- all other representations are volatile representations that have not been
- // exposed.
- return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND);
- } else if (autoTermIterator != null && autoTermIterator.hasNext()) {
- return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true);
- }
-
- throw new NoSuchElementException();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
- };
-
- provenanceRepo.registerEvents(iterable);
+
+ final List<ProvenanceEventRecord> enrichedEvents = new ArrayList<>();
+ for ( final ProvenanceEventRecord record : recordsToSubmit ) {
+ enrichedEvents.add(enrich(record, flowFileRecordMap, checkpoint.records, record.getEventType() != ProvenanceEventType.SEND));
+ }
+ for ( final ProvenanceEventRecord record : autoTermEvents ) {
+ enrichedEvents.add(enrich(record, flowFileRecordMap, checkpoint.records, true));
+ }
+
+ provenanceRepo.registerEvents(enrichedEvents);
}
private void updateEventContentClaims(final ProvenanceEventBuilder builder, final FlowFile flowFile, final StandardRepositoryRecord repoRecord) {
@@ -1140,7 +1113,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Connection conn = connections.get(context.getNextIncomingConnectionIndex() % connections.size());
final Set<FlowFileRecord> expired = new HashSet<>();
final FlowFileRecord flowFile = conn.getFlowFileQueue().poll(expired);
- removeExpired(expired, conn);
+
+ try {
+ removeExpired(expired, conn);
+ } catch (final IOException ioe) {
+ throw new ProcessException("Failed to update repositories to remove expired FlowFiles", ioe);
+ }
if (flowFile != null) {
registerDequeuedRecord(flowFile, conn);
@@ -1201,7 +1179,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
for (final Connection conn : connections) {
final Set<FlowFileRecord> expired = new HashSet<>();
final List<FlowFileRecord> newlySelected = poller.poll(conn.getFlowFileQueue(), expired);
- removeExpired(expired, conn);
+ try {
+ removeExpired(expired, conn);
+ } catch (final IOException ioe) {
+ throw new ProcessException("Failed to update repositories to remove expired FlowFiles", ioe);
+ }
if (newlySelected.isEmpty() && expired.isEmpty()) {
continue;
@@ -1571,7 +1553,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
- public void expireFlowFiles() {
+ public void expireFlowFiles() throws IOException {
final Set<FlowFileRecord> expired = new HashSet<>();
final FlowFileFilter filter = new FlowFileFilter() {
@Override
@@ -1589,7 +1571,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
}
- private void removeExpired(final Set<FlowFileRecord> flowFiles, final Connection connection) {
+ private void removeExpired(final Set<FlowFileRecord> flowFiles, final Connection connection) throws IOException {
if (flowFiles.isEmpty()) {
return;
}
@@ -1612,7 +1594,31 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
for (final FlowFileRecord flowFile : flowFiles) {
recordIdMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
+ }
+
+ final Set<ProvenanceEventRecord> expiredEvents = expiredReporter.getEvents();
+ final List<ProvenanceEventRecord> events = new ArrayList<>(expiredEvents.size());
+ for ( final ProvenanceEventRecord event : expiredEvents ) {
+ final StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event);
+ final FlowFileRecord record = recordIdMap.get(event.getFlowFileUuid());
+ if (record == null) {
+ continue;
+ }
+
+ final ContentClaim claim = record.getContentClaim();
+ if (claim != null) {
+ enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+ enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
+ }
+ enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
+ events.add(enriched.build());
+ }
+
+ context.getProvenanceRepository().registerEvents(events);
+ context.getFlowFileRepository().updateRepository(expiredRecords);
+
+ for ( final FlowFileRecord flowFile : flowFiles ) {
final StandardRepositoryRecord record = new StandardRepositoryRecord(connection.getFlowFileQueue(), flowFile);
record.markForDelete();
expiredRecords.add(record);
@@ -1623,53 +1629,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final Object terminator = (connectable instanceof ProcessorNode) ? ((ProcessorNode) connectable).getProcessor() : connectable;
LOG.info("{} terminated by {} due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFile, terminator, flowFileLife});
}
-
- try {
- final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
- @Override
- public Iterator<ProvenanceEventRecord> iterator() {
- final Iterator<ProvenanceEventRecord> expiredEventIterator = expiredReporter.getEvents().iterator();
- final Iterator<ProvenanceEventRecord> enrichingIterator = new Iterator<ProvenanceEventRecord>() {
- @Override
- public boolean hasNext() {
- return expiredEventIterator.hasNext();
- }
-
- @Override
- public ProvenanceEventRecord next() {
- final ProvenanceEventRecord event = expiredEventIterator.next();
- final StandardProvenanceEventRecord.Builder enriched = new StandardProvenanceEventRecord.Builder().fromEvent(event);
- final FlowFileRecord record = recordIdMap.get(event.getFlowFileUuid());
- if (record == null) {
- return null;
- }
-
- final ContentClaim claim = record.getContentClaim();
- if (claim != null) {
- enriched.setCurrentContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
- enriched.setPreviousContentClaim(claim.getContainer(), claim.getSection(), claim.getId(), record.getContentClaimOffset(), record.getSize());
- }
-
- enriched.setAttributes(record.getAttributes(), Collections.<String, String>emptyMap());
- return enriched.build();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
-
- return enrichingIterator;
- }
- };
-
- context.getProvenanceRepository().registerEvents(iterable);
- context.getFlowFileRepository().updateRepository(expiredRecords);
- } catch (final IOException e) {
- LOG.error("Failed to update FlowFile Repository to record expired records due to {}", e);
- }
-
}
private InputStream getInputStream(final FlowFile flowFile, final ContentClaim claim, final long offset) throws ContentNotFoundException {
@@ -2438,7 +2397,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final ProvenanceEventRecord dropEvent = provenanceReporter.drop(suspectRecord.getCurrent(), nfe.getMessage() == null ? "Content Not Found" : nfe.getMessage());
if (dropEvent != null) {
- context.getProvenanceRepository().registerEvent(dropEvent);
+ try {
+ context.getProvenanceRepository().registerEvent(dropEvent);
+ } catch (final IOException ioe) {
+ LOG.error("{} Failed to register DROP Provenance event for {} when handling ContentNotFound error due to {}", this, suspectRecord.getCurrent(), ioe.toString());
+ if ( LOG.isDebugEnabled() ) {
+ LOG.error("", ioe);
+ }
+ }
}
if (missingClaim == registeredClaim) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
index a351a68..d0020b5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ExpireFlowFiles.java
@@ -16,9 +16,12 @@
*/
package org.apache.nifi.controller.tasks;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import javax.print.attribute.standard.Severity;
+
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
@@ -29,9 +32,12 @@ import org.apache.nifi.controller.repository.ProcessContext;
import org.apache.nifi.controller.repository.StandardProcessSession;
import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This task runs through all Connectable Components and goes through its
@@ -39,7 +45,8 @@ import org.apache.nifi.util.FormatUtils;
* desired side effect of expiring old FlowFiles.
*/
public class ExpireFlowFiles implements Runnable {
-
+ private static final Logger logger = LoggerFactory.getLogger(ExpireFlowFiles.class);
+
private final FlowController flowController;
private final ProcessContextFactory contextFactory;
@@ -51,7 +58,19 @@ public class ExpireFlowFiles implements Runnable {
@Override
public void run() {
final ProcessGroup rootGroup = flowController.getGroup(flowController.getRootGroupId());
- expireFlowFiles(rootGroup);
+
+ try {
+ expireFlowFiles(rootGroup);
+ } catch (final Exception e) {
+ logger.error("Failed to expire FlowFiles due to {}", e.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+
+ flowController.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "FlowFile Expiration", Severity.ERROR.getName(), "Could not expire FlowFiles due to " + e));
+
+ }
}
private StandardProcessSession createSession(final Connectable connectable) {
@@ -60,7 +79,7 @@ public class ExpireFlowFiles implements Runnable {
return sessionFactory.createSession();
}
- private void expireFlowFiles(final Connectable connectable) {
+ private void expireFlowFiles(final Connectable connectable) throws IOException {
// determine if the incoming connections for this Connectable have Expiration configured.
boolean expirationConfigured = false;
for (final Connection incomingConn : connectable.getIncomingConnections()) {
@@ -80,7 +99,7 @@ public class ExpireFlowFiles implements Runnable {
session.commit();
}
- private void expireFlowFiles(final ProcessGroup group) {
+ private void expireFlowFiles(final ProcessGroup group) throws IOException {
for (final ProcessorNode procNode : group.getProcessors()) {
expireFlowFiles(procNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 1ff63c5..7ae156c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -65,6 +65,7 @@ import org.apache.nifi.provenance.MockProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -357,7 +358,7 @@ public class TestStandardProcessSession {
session.commit();
- final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
+ final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 1000);
// We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
assertEquals(3, events.size());
@@ -412,7 +413,7 @@ public class TestStandardProcessSession {
session.remove(orig);
session.commit();
- final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
+ final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 1000);
assertEquals(2, events.size());
final ProvenanceEventRecord firstRecord = events.get(0);
@@ -838,7 +839,7 @@ public class TestStandardProcessSession {
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
- final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+ final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
@@ -857,7 +858,7 @@ public class TestStandardProcessSession {
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
- final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+ final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
@@ -883,7 +884,7 @@ public class TestStandardProcessSession {
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
session.commit();
- final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+ final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
@@ -904,7 +905,7 @@ public class TestStandardProcessSession {
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
session.commit();
- final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+ final List<StoredProvenanceEvent> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index b009581..56db464 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -697,9 +697,9 @@ public class ControllerFacade implements ControllerServiceProvider {
resultsDto.setTimeOffset(TimeZone.getDefault().getOffset(now.getTime()));
// get the oldest available event time
- final List<ProvenanceEventRecord> firstEvent = provenanceRepository.getEvents(0, 1);
- if (!firstEvent.isEmpty()) {
- resultsDto.setOldestEvent(new Date(firstEvent.get(0).getEventTime()));
+ final Long oldestEventTime = provenanceRepository.getEarliestEventTime();
+ if (oldestEventTime != null) {
+ resultsDto.setOldestEvent(new Date(oldestEventTime));
}
provenanceDto.setResults(resultsDto);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
index 5997281..4e9e9fb 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/pom.xml
@@ -36,5 +36,10 @@
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-queryparser</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
index 2130e73..7911d73 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/JournalingProvenanceRepository.java
@@ -55,6 +55,8 @@ import org.apache.nifi.provenance.journaling.partition.PartitionAction;
import org.apache.nifi.provenance.journaling.partition.PartitionManager;
import org.apache.nifi.provenance.journaling.partition.QueuingPartitionManager;
import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction;
+import org.apache.nifi.provenance.journaling.query.QueryManager;
+import org.apache.nifi.provenance.journaling.query.StandardQueryManager;
import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
import org.apache.nifi.provenance.journaling.toc.TocReader;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
@@ -67,15 +69,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JournalingProvenanceRepository implements ProvenanceEventRepository {
+ public static final String BLOCK_SIZE = "nifi.provenance.block.size";
+
private static final Logger logger = LoggerFactory.getLogger(JournalingProvenanceRepository.class);
private final JournalingRepositoryConfig config;
- private final PartitionManager partitionManager;
private final AtomicLong idGenerator = new AtomicLong(0L);
-
- private EventReporter eventReporter; // effectively final
private final ExecutorService executor;
+ private EventReporter eventReporter; // effectively final
+ private PartitionManager partitionManager; // effectively final
+ private QueryManager queryManager; // effectively final
public JournalingProvenanceRepository() throws IOException {
this(createConfig());
@@ -84,7 +88,6 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
public JournalingProvenanceRepository(final JournalingRepositoryConfig config) throws IOException {
this.config = config;
this.executor = Executors.newFixedThreadPool(config.getThreadPoolSize());
- this.partitionManager = new QueuingPartitionManager(config, executor);
}
@@ -110,7 +113,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
final boolean compressOnRollover = Boolean.parseBoolean(properties.getProperty(NiFiProperties.PROVENANCE_COMPRESS_ON_ROLLOVER));
final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS);
final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES);
-
+ final int blockSize = properties.getIntegerProperty(BLOCK_SIZE, 1000);
+
final Boolean alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.provenance.repository.always.sync", "false"));
final List<SearchableField> searchableFields = SearchableFieldParser.extractSearchableFields(indexedFieldString, true);
@@ -137,7 +141,8 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
config.setMaxStorageCapacity(maxStorageBytes);
config.setThreadPoolSize(queryThreads);
config.setPartitionCount(journalCount);
-
+ config.setBlockSize(blockSize);
+
if (shardSize != null) {
config.setDesiredIndexSize(DataUnit.parseDataSize(shardSize, DataUnit.B).longValue());
}
@@ -150,6 +155,9 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
@Override
public synchronized void initialize(final EventReporter eventReporter) throws IOException {
this.eventReporter = eventReporter;
+
+ this.partitionManager = new QueuingPartitionManager(config, executor);
+ this.queryManager = new StandardQueryManager(partitionManager, config, 10);
}
@Override
@@ -328,14 +336,12 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
@Override
public QuerySubmission submitQuery(final Query query) {
- // TODO Auto-generated method stub
- return null;
+ return queryManager.submitQuery(query);
}
@Override
public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) {
- // TODO Auto-generated method stub
- return null;
+ return queryManager.retrieveQuerySubmission(queryIdentifier);
}
@Override
@@ -364,7 +370,10 @@ public class JournalingProvenanceRepository implements ProvenanceEventRepository
@Override
public void close() throws IOException {
- partitionManager.shutdown();
+ if ( partitionManager != null ) {
+ partitionManager.shutdown();
+ }
+
executor.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
index 6dd7be9..8998932 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/config/JournalingRepositoryConfig.java
@@ -34,7 +34,7 @@ public class JournalingRepositoryConfig {
private long journalCapacity = 1024L * 1024L * 5L; // 5 MB
private long desiredIndexBytes = 1024L * 1024L * 500L; // 500 MB
private int partitionCount = 16;
- private int blockSize = 100;
+ private int blockSize = 5000;
private List<SearchableField> searchableFields = new ArrayList<>();
private List<SearchableField> searchableAttributes = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
index 82ef39b..2ec5131 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/journals/StandardJournalReader.java
@@ -163,7 +163,7 @@ public class StandardJournalReader implements JournalReader {
}
}
- throw new IOException("Could not find event with ID " + eventId);
+ throw new IOException("Could not find event with ID " + eventId + " in " + this);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
index 51f84a2..651c41e 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/JournalingPartition.java
@@ -214,6 +214,7 @@ public class JournalingPartition implements Partition {
final File journalFile = new File(journalsDir, firstEventId + JOURNAL_FILE_EXTENSION);
journalWriter = new StandardJournalWriter(firstEventId, journalFile, false, new StandardEventSerializer());
tocWriter = new StandardTocWriter(QueryUtils.getTocFile(journalFile), false);
+ tocWriter.addBlockOffset(journalWriter.getSize());
numEventsAtEndOfLastBlock = 0;
}
@@ -421,4 +422,9 @@ public class JournalingPartition implements Partition {
public Long getEarliestEventTime() throws IOException {
return earliestEventTime;
}
+
+ @Override
+ public String toString() {
+ return "Partition[section=" + sectionName + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
index 4ac0fc6..51d90e2 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/partition/QueuingPartitionManager.java
@@ -35,9 +35,13 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
import org.apache.nifi.util.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class QueuingPartitionManager implements PartitionManager {
-
+
+ private static final Logger logger = LoggerFactory.getLogger(QueuingPartitionManager.class);
+
private final JournalingRepositoryConfig config;
private final BlockingQueue<Partition> partitionQueue;
private final JournalingPartition[] partitionArray;
@@ -180,6 +184,39 @@ public class QueuingPartitionManager implements PartitionManager {
@Override
public void withEachPartition(final VoidPartitionAction action, final boolean async) {
+ // TODO: Do not use blacklisted partitions.
+ final Map<Partition, Future<?>> futures = new HashMap<>(partitionArray.length);
+ for ( final Partition partition : partitionArray ) {
+ final Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ action.perform(partition);
+ } catch (final Throwable t) {
+ logger.error("Failed to perform action against " + partition + " due to " + t);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", t);
+ }
+ }
+ }
+ };
+
+ final Future<?> future = executor.submit(runnable);
+ futures.put(partition, future);
+ }
+ if ( !async ) {
+ for ( final Map.Entry<Partition, Future<?>> entry : futures.entrySet() ) {
+ try {
+ // throw any exception thrown by runnable
+ entry.getValue().get();
+ } catch (final ExecutionException ee) {
+ final Throwable cause = ee.getCause();
+ throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
new file mode 100644
index 0000000..4edc6ad
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/QueryManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.query;
+
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+
+public interface QueryManager {
+ /**
+ * Submits an asynchronous request to process the given query, returning an
+ * identifier that can be used to fetch the results at a later time
+ *
+ * @param query
+ * @return
+ */
+ QuerySubmission submitQuery(Query query);
+
+ /**
+ * Returns the QueryResult associated with the given identifier, if the
+ * query has finished processing. If the query has not yet finished running,
+ * returns <code>null</code>.
+ *
+ * @param queryIdentifier
+ *
+ * @return
+ */
+ QuerySubmission retrieveQuerySubmission(String queryIdentifier);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
new file mode 100644
index 0000000..4cce231
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/java/org/apache/nifi/provenance/journaling/query/StandardQueryManager.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling.query;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.provenance.AsyncQuerySubmission;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
+import org.apache.nifi.provenance.journaling.JournaledStorageLocation;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.journaling.index.EventIndexSearcher;
+import org.apache.nifi.provenance.journaling.index.QueryUtils;
+import org.apache.nifi.provenance.journaling.index.SearchResult;
+import org.apache.nifi.provenance.journaling.journals.JournalReader;
+import org.apache.nifi.provenance.journaling.journals.StandardJournalReader;
+import org.apache.nifi.provenance.journaling.partition.Partition;
+import org.apache.nifi.provenance.journaling.partition.PartitionManager;
+import org.apache.nifi.provenance.journaling.partition.VoidPartitionAction;
+import org.apache.nifi.provenance.journaling.toc.StandardTocReader;
+import org.apache.nifi.provenance.journaling.toc.TocReader;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StandardQueryManager implements QueryManager {
+ private static final Logger logger = LoggerFactory.getLogger(StandardQueryManager.class);
+
+ private final int maxConcurrentQueries;
+ private final JournalingRepositoryConfig config;
+ private final PartitionManager partitionManager;
+ private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>();
+
+ public StandardQueryManager(final PartitionManager partitionManager, final JournalingRepositoryConfig config, final int maxConcurrentQueries) {
+ this.config = config;
+ this.maxConcurrentQueries = maxConcurrentQueries;
+ this.partitionManager = partitionManager;
+ }
+
+ @Override
+ public QuerySubmission submitQuery(final Query query) {
+ final int numQueries = querySubmissionMap.size();
+ if (numQueries > maxConcurrentQueries) {
+ throw new IllegalStateException("Cannot process query because there are currently " + numQueries + " queries whose results have not been deleted (likely due to poorly behaving clients not issuing DELETE requests). Please try again later.");
+ }
+
+ if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) {
+ throw new IllegalArgumentException("Query End Time cannot be before Query Start Time");
+ }
+
+ if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) {
+ final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1);
+
+ querySubmissionMap.put(query.getIdentifier(), result);
+ return result;
+ }
+
+ final AtomicInteger retrievalCount = new AtomicInteger(query.getMaxResults());
+ final AsyncQuerySubmission submission = new AsyncQuerySubmission(query, config.getPartitionCount()) {
+ @Override
+ public void cancel() {
+ super.cancel();
+ querySubmissionMap.remove(query.getIdentifier());
+ }
+ };
+
+ querySubmissionMap.put(query.getIdentifier(), submission);
+
+ partitionManager.withEachPartition(new VoidPartitionAction() {
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void perform(final Partition partition) throws IOException {
+ logger.debug("Running {} against {}", query, partition);
+
+ try (final EventIndexSearcher searcher = partition.newIndexSearcher()) {
+ final SearchResult searchResult = searcher.search(query);
+ logger.debug("{} has {} hits against {} over {} files", query, searchResult.getTotalCount(), partition, searchResult.getLocations().size());
+
+ final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>();
+ final Map<File, List<JournaledStorageLocation>> locationMap = QueryUtils.orderLocations((List) searchResult.getLocations(), config);
+ for ( final Map.Entry<File, List<JournaledStorageLocation>> entry : locationMap.entrySet() ) {
+ final File journalFile = entry.getKey();
+ final List<JournaledStorageLocation> locations = entry.getValue();
+
+ if ( retrievalCount.get() <= 0 ) {
+ break;
+ }
+
+ try (final JournalReader reader = new StandardJournalReader(journalFile);
+ final TocReader tocReader = new StandardTocReader(QueryUtils.getTocFile(journalFile))) {
+
+ for ( final JournaledStorageLocation location : locations ) {
+ final long blockOffset = tocReader.getBlockOffset(location.getBlockIndex());
+ final ProvenanceEventRecord event = reader.getEvent(blockOffset, location.getEventId());
+ matchingRecords.add(new JournaledProvenanceEvent(event, location));
+
+ final int recordsLeft = retrievalCount.decrementAndGet();
+ if ( recordsLeft <= 0 ) {
+ break;
+ }
+ }
+ }
+ }
+
+ logger.debug("Finished executing {} against {}", query, partition);
+ submission.getResult().update(matchingRecords, searchResult.getTotalCount());
+ } catch (final Exception e) {
+ submission.getResult().setError("Failed to query " + partition + " due to " + e.toString());
+ throw e;
+ }
+ }
+ }, true);
+
+ return submission;
+ }
+
+ @Override
+ public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) {
+ return querySubmissionMap.get(queryIdentifier);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository
new file mode 100644
index 0000000..e224c51
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/main/resources/META-INF/services/org.apache.nifi.provenance.ProvenanceEventRepository
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.provenance.journaling.JournalingProvenanceRepository
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
new file mode 100644
index 0000000..a547a8a
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestJournalingProvenanceRepository.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance.journaling;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.nifi.provenance.SearchableFields;
+import org.apache.nifi.provenance.StoredProvenanceEvent;
+import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
+import org.apache.nifi.provenance.search.Query;
+import org.apache.nifi.provenance.search.QueryResult;
+import org.apache.nifi.provenance.search.QuerySubmission;
+import org.apache.nifi.provenance.search.SearchTerms;
+import org.apache.nifi.provenance.search.SearchableField;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestJournalingProvenanceRepository {
+
+
+ @BeforeClass
+ public static void setupLogging() {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance.journaling", "DEBUG");
+ }
+
+ @Test
+ public void testStoreAndRetrieve() throws IOException {
+ final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+ final Map<String, File> containers = new HashMap<>();
+ containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+ containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+ config.setContainers(containers);
+ config.setPartitionCount(3);
+
+ try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+ repo.initialize(null);
+ final Map<String, String> attributes = new HashMap<>();
+
+ for (int i=0; i < 10; i++) {
+ attributes.put("i", String.valueOf(i));
+ repo.registerEvent(TestUtil.generateEvent(i, attributes));
+ }
+
+ // retrieve records one at a time.
+ for (int i=0; i < 10; i++) {
+ final StoredProvenanceEvent event = repo.getEvent(i);
+ assertNotNull(event);
+ assertEquals((long) i, event.getEventId());
+ assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+ }
+
+ final List<StoredProvenanceEvent> events = repo.getEvents(0, 1000);
+ assertNotNull(events);
+ assertEquals(10, events.size());
+ for (int i=0; i < 10; i++) {
+ final StoredProvenanceEvent event = events.get(i);
+ assertNotNull(event);
+ assertEquals((long) i, event.getEventId());
+ assertEquals("00000000-0000-0000-0000-00000000000" + i, event.getFlowFileUuid());
+ }
+ } finally {
+ for ( final File file : containers.values() ) {
+ if ( file.exists() ) {
+ FileUtils.deleteFile(file, true);
+ }
+ }
+ }
+ }
+
+
+ @Test(timeout=10000000)
+ public void testSearchByUUID() throws IOException, InterruptedException {
+ final JournalingRepositoryConfig config = new JournalingRepositoryConfig();
+ final Map<String, File> containers = new HashMap<>();
+ containers.put("container1", new File("target/" + UUID.randomUUID().toString()));
+ containers.put("container2", new File("target/" + UUID.randomUUID().toString()));
+ config.setContainers(containers);
+
+ config.setPartitionCount(3);
+ config.setSearchableFields(Arrays.asList(new SearchableField[] {
+ SearchableFields.FlowFileUUID
+ }));
+
+ try (final JournalingProvenanceRepository repo = new JournalingProvenanceRepository(config)) {
+ repo.initialize(null);
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ for (int i=0; i < 10; i++) {
+ attributes.put("i", String.valueOf(i));
+ repo.registerEvent(TestUtil.generateEvent(i, attributes));
+ }
+
+ final Query query = new Query("query");
+ query.addSearchTerm(SearchTerms.newSearchTerm(SearchableFields.FlowFileUUID, "00000000-0000-0000-0000-000000000005"));
+ final QuerySubmission submission = repo.submitQuery(query);
+ assertNotNull(submission);
+
+ final QueryResult result = submission.getResult();
+ while ( !result.isFinished() ) {
+ Thread.sleep(50L);
+ }
+
+ assertNull(result.getError());
+ final List<StoredProvenanceEvent> matches = result.getMatchingEvents();
+ assertNotNull(matches);
+ assertEquals(1, matches.size());
+
+ final StoredProvenanceEvent event = matches.get(0);
+ assertEquals(5, event.getEventId());
+ assertEquals("00000000-0000-0000-0000-000000000005", event.getFlowFileUuid());
+ } finally {
+ for ( final File file : containers.values() ) {
+ FileUtils.deleteFile(file, true);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b95e7569/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
index 45b7338..6d05f7a 100644
--- a/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
+++ b/nifi/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-journaling-provenance-repository/src/test/java/org/apache/nifi/provenance/journaling/TestUtil.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.provenance.journaling;
+import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -23,7 +25,12 @@ import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
public class TestUtil {
+
public static ProvenanceEventRecord generateEvent(final long id) {
+ return generateEvent(id, Collections.<String, String>emptyMap());
+ }
+
+ public static ProvenanceEventRecord generateEvent(final long id, final Map<String, String> attributes) {
// Create prov event to add to the stream
final ProvenanceEventRecord event = new StandardProvenanceEventRecord.Builder()
.setEventType(ProvenanceEventType.CREATE)
@@ -34,6 +41,7 @@ public class TestUtil {
.setFlowFileEntryDate(System.currentTimeMillis() - 1000L)
.setLineageStartDate(System.currentTimeMillis() - 2000L)
.setCurrentContentClaim(null, null, null, null, 0L)
+ .setAttributes(null, attributes == null ? Collections.<String, String>emptyMap() : attributes)
.build();
return event;