You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/04/27 21:03:33 UTC
[1/4] nifi-minifi git commit: MINIFI-279: Makes test classes naming
consistent.
Repository: nifi-minifi
Updated Branches:
refs/heads/master 25298e90b -> b059afef2
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepositoryTest.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepositoryTest.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepositoryTest.java
new file mode 100644
index 0000000..08d5fc6
--- /dev/null
+++ b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/MiNiFiPersistentProvenanceRepositoryTest.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.provenance;
+
+import org.apache.lucene.queryparser.classic.ParseException;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository.MethodNotSupportedException;
+import org.apache.nifi.provenance.serialization.RecordReader;
+import org.apache.nifi.provenance.serialization.RecordReaders;
+import org.apache.nifi.provenance.serialization.RecordWriter;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.stream.io.DataOutputStream;
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPOutputStream;
+
+import static org.apache.nifi.provenance.TestUtil.createFlowFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class MiNiFiPersistentProvenanceRepositoryTest {
+
+ @Rule
+ public TestName name = new TestName();
+
+ private MiNiFiPersistentProvenanceRepository repo;
+ private RepositoryConfiguration config;
+
+ public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
+ private EventReporter eventReporter;
+ private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
+
+ private RepositoryConfiguration createConfiguration() {
+ config = new RepositoryConfiguration();
+ config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString()));
+ config.setCompressOnRollover(true);
+ config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
+ config.setCompressionBlockBytes(100);
+ return config;
+ }
+
+ @BeforeClass
+ public static void setLogLevel() {
+ System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
+ }
+
+ @Before
+ public void printTestName() {
+ reportedEvents.clear();
+ eventReporter = new EventReporter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void reportEvent(Severity severity, String category, String message) {
+ reportedEvents.add(new ReportedEvent(severity, category, message));
+ System.out.println(severity + " : " + category + " : " + message);
+ }
+ };
+ }
+
+ @After
+ public void closeRepo() throws IOException {
+ if (repo != null) {
+ try {
+ repo.close();
+ } catch (final IOException ioe) {
+ }
+ }
+
+ // Delete all of the storage files. We do this in order to clean up the tons of files that
+ // we create but also to ensure that we have closed all of the file handles. If we leave any
+ // streams open, for instance, this will throw an IOException, causing our unit test to fail.
+ for (final File storageDir : config.getStorageDirectories()) {
+ int i;
+ for (i = 0; i < 3; i++) {
+ try {
+ FileUtils.deleteFile(storageDir, true);
+ break;
+ } catch (final IOException ioe) {
+ // if there is a virus scanner, etc. running in the background we may not be able to
+ // delete the file. Wait a sec and try again.
+ if (i == 2) {
+ throw ioe;
+ } else {
+ try {
+ Thread.sleep(1000L);
+ } catch (final InterruptedException ie) {
+ }
+ }
+ }
+ }
+ }
+ }
+
+
+
+ private EventReporter getEventReporter() {
+ return eventReporter;
+ }
+
+
+ @Test
+ public void testAddAndRecover() throws IOException, InterruptedException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxEventFileCapacity(1L);
+ config.setMaxEventFileLife(1, TimeUnit.SECONDS);
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter(), null, null);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("abc", "xyz");
+ attributes.put("xyz", "abc");
+ attributes.put("uuid", UUID.randomUUID().toString());
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+ final ProvenanceEventRecord record = builder.build();
+
+ for (int i = 0; i < 10; i++) {
+ repo.registerEvent(record);
+ }
+
+ Assert.assertEquals("Did not establish the correct, Max Event Id", 9, repo.getMaxEventId().intValue());
+
+ Thread.sleep(1000L);
+
+ repo.close();
+ Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
+
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter(), null, null);
+ final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, 12);
+
+ Assert.assertEquals("Did not establish the correct, Max Event Id through recovery after reloading", 9, repo.getMaxEventId().intValue());
+
+ assertEquals(10, recoveredRecords.size());
+ for (int i = 0; i < 10; i++) {
+ final ProvenanceEventRecord recovered = recoveredRecords.get(i);
+ assertEquals(i, recovered.getEventId());
+ assertEquals("nifi://unit-test", recovered.getTransitUri());
+ assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType());
+ assertEquals(attributes, recovered.getAttributes());
+ }
+ }
+
+
+ @Test
+ public void testCompressOnRollover() throws IOException, InterruptedException, ParseException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+ config.setCompressOnRollover(true);
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter(), null, null);
+
+ final String uuid = "00000000-0000-0000-0000-000000000000";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("abc", "xyz");
+ attributes.put("xyz", "abc");
+ attributes.put("filename", "file-" + uuid);
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", uuid);
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ for (int i = 0; i < 10; i++) {
+ builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+ repo.registerEvent(builder.build());
+ }
+
+ repo.waitForRollover();
+ final File storageDir = config.getStorageDirectories().get(0);
+ final File compressedLogFile = new File(storageDir, "0.prov.gz");
+ assertTrue(compressedLogFile.exists());
+ }
+
+ @Test(expected = MethodNotSupportedException.class)
+ public void testLineageRequestNotSupported() throws IOException, InterruptedException, ParseException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxRecordLife(3, TimeUnit.SECONDS);
+ config.setMaxStorageCapacity(1024L * 1024L);
+ config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+ config.setMaxEventFileCapacity(1024L * 1024L);
+ config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
+
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter(), null, null);
+
+ final String uuid = "00000000-0000-0000-0000-000000000001";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("abc", "xyz");
+ attributes.put("uuid", uuid);
+ attributes.put("filename", "file-" + uuid);
+
+ repo.submitLineageComputation(uuid, null);
+ }
+
+
+ @Test
+ public void testCorrectProvenanceEventIdOnRestore() throws IOException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxEventFileLife(1, TimeUnit.SECONDS);
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter(), null, null);
+
+ final String uuid = "00000000-0000-0000-0000-000000000000";
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("abc", "xyz");
+ attributes.put("xyz", "abc");
+ attributes.put("filename", "file-" + uuid);
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", uuid);
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ for (int i = 0; i < 10; i++) {
+ builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+ repo.registerEvent(builder.build());
+ }
+
+ repo.close();
+
+ final MiNiFiPersistentProvenanceRepository secondRepo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ secondRepo.initialize(getEventReporter(), null, null);
+
+ try {
+ final ProvenanceEventRecord event11 = builder.build();
+ secondRepo.registerEvent(event11);
+ secondRepo.waitForRollover();
+ final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
+ assertNotNull(event11Retrieved);
+ assertEquals(10, event11Retrieved.getEventId());
+ Assert.assertEquals(10, secondRepo.getMaxEventId().intValue());
+ } finally {
+ secondRepo.close();
+ }
+ }
+
+ /**
+ * Here the event file is simply corrupted by virtue of not having any event
+ * records while having correct headers
+ */
+ @Test
+ public void testWithWithEventFileMissingRecord() throws Exception {
+ File eventFile = this.prepCorruptedEventFileTests();
+
+ DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
+ in.writeUTF("BlahBlah");
+ in.writeInt(4);
+ in.close();
+ assertTrue(eventFile.exists());
+
+ final List<ProvenanceEventRecord> events = repo.getEvents(0, 100);
+ assertEquals(10, events.size());
+ }
+
+ /**
+ * Here the event file is simply corrupted by virtue of being empty (0
+ * bytes)
+ */
+ @Test
+ public void testWithWithEventFileCorrupted() throws Exception {
+ File eventFile = this.prepCorruptedEventFileTests();
+
+ DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
+ in.close();
+
+ final List<ProvenanceEventRecord> events = repo.getEvents(0, 100);
+ assertEquals(10, events.size());
+ }
+
+ private File prepCorruptedEventFileTests() throws Exception {
+ RepositoryConfiguration config = createConfiguration();
+ config.setMaxStorageCapacity(1024L * 1024L);
+ config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
+ config.setMaxEventFileCapacity(1024L * 1024L);
+ config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
+ config.setDesiredIndexSize(10);
+
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter(), null, null);
+
+ String uuid = UUID.randomUUID().toString();
+ for (int i = 0; i < 20; i++) {
+ ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class))
+ .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
+ .setFlowFileUUID(uuid).build();
+ repo.registerEvent(record);
+ if (i == 9) {
+ repo.waitForRollover();
+ Thread.sleep(2000L);
+ }
+ }
+ repo.waitForRollover();
+ File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz");
+ assertTrue(eventFile.delete());
+ return eventFile;
+ }
+
+ @Test
+ public void testBackPressure() throws IOException, InterruptedException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxEventFileCapacity(1L); // force rollover on each record.
+ config.setJournalCount(1);
+
+ final AtomicInteger journalCountRef = new AtomicInteger(0);
+
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+ @Override
+ protected int getJournalCount() {
+ return journalCountRef.get();
+ }
+ };
+ repo.initialize(getEventReporter(), null, null);
+
+ final Map<String, String> attributes = new HashMap<>();
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", UUID.randomUUID().toString());
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ // ensure that we can register the events.
+ for (int i = 0; i < 10; i++) {
+ builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
+ repo.registerEvent(builder.build());
+ }
+
+ // set number of journals to 6 so that we will block.
+ journalCountRef.set(6);
+
+ final AtomicLong threadNanos = new AtomicLong(0L);
+ final Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ final long start = System.nanoTime();
+ builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
+ repo.registerEvent(builder.build());
+ threadNanos.set(System.nanoTime() - start);
+ }
+ });
+ t.start();
+
+ Thread.sleep(1500L);
+
+ journalCountRef.set(1);
+ t.join();
+
+ final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
+ assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
+
+ builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
+ attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
+ repo.registerEvent(builder.build());
+ }
+
+ @Test
+ public void testMergeJournals() throws IOException, InterruptedException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter(), null, null);
+
+ final Map<String, String> attributes = new HashMap<>();
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ final ProvenanceEventRecord record = builder.build();
+
+ final ExecutorService exec = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 10000; i++) {
+ exec.submit(new Runnable() {
+ @Override
+ public void run() {
+ repo.registerEvent(record);
+ }
+ });
+ }
+
+ repo.waitForRollover();
+
+ final File storageDir = config.getStorageDirectories().get(0);
+ long counter = 0;
+ for (final File file : storageDir.listFiles()) {
+ if (file.isFile()) {
+
+ try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
+ ProvenanceEventRecord r = null;
+
+ while ((r = reader.nextRecord()) != null) {
+ assertEquals(counter++, r.getEventId());
+ }
+ }
+ }
+ }
+
+ assertEquals(10000, counter);
+ }
+
+ @Test
+ public void testTruncateAttributes() throws IOException, InterruptedException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxAttributeChars(50);
+ config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
+ repo.initialize(getEventReporter(), null, null);
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ final ProvenanceEventRecord record = builder.build();
+ repo.registerEvent(record);
+ repo.waitForRollover();
+
+ final ProvenanceEventRecord retrieved = repo.getEvent(0L);
+ assertNotNull(retrieved);
+ assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid"));
+ assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars"));
+ }
+
+
+ @Test
+ public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
+ final RepositoryConfiguration config = createConfiguration();
+ config.setMaxAttributeChars(50);
+ config.setMaxEventFileLife(3, TimeUnit.SECONDS);
+
+ // Create a repo that will allow only a single writer to be created.
+ final IOException failure = new IOException("Already created writers once. Unit test causing failure.");
+ repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
+ int iterations = 0;
+
+ @Override
+ protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException {
+ if (iterations++ == 1) {
+ throw failure;
+ } else {
+ return super.createWriters(config, initialRecordId);
+ }
+ }
+ };
+
+ // initialize with our event reporter
+ repo.initialize(getEventReporter(), null, null);
+
+ // create some events in the journal files.
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
+
+ final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
+ builder.setEventTime(System.currentTimeMillis());
+ builder.setEventType(ProvenanceEventType.RECEIVE);
+ builder.setTransitUri("nifi://unit-test");
+ attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
+ builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
+ builder.setComponentId("1234");
+ builder.setComponentType("dummy processor");
+
+ for (int i = 0; i < 50; i++) {
+ final ProvenanceEventRecord event = builder.build();
+ repo.registerEvent(event);
+ }
+
+ // Attempt to rollover but fail to create new writers.
+ try {
+ repo.rolloverWithLock(true);
+ Assert.fail("Expected to get IOException when calling rolloverWithLock");
+ } catch (final IOException ioe) {
+ assertTrue(ioe == failure);
+ }
+
+ // Wait for the first rollover to succeed.
+ repo.waitForRollover();
+
+ // This time when we rollover, we should not have a problem rolling over.
+ repo.rolloverWithLock(true);
+
+ // Ensure that no errors were reported.
+ assertEquals(0, reportedEvents.size());
+ }
+
+ private static class ReportedEvent {
+ private final Severity severity;
+ private final String category;
+ private final String message;
+
+ public ReportedEvent(final Severity severity, final String category, final String message) {
+ this.severity = severity;
+ this.category = category;
+ this.message = message;
+ }
+
+ public String getCategory() {
+ return category;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public Severity getSeverity() {
+ return severity;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java b/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java
deleted file mode 100644
index 0dd5f65..0000000
--- a/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-persistent-provenance-repository/src/test/java/org/apache/nifi/provenance/TestMiNiFiPersistentProvenanceRepository.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.provenance;
-
-import org.apache.lucene.queryparser.classic.ParseException;
-import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.provenance.MiNiFiPersistentProvenanceRepository.MethodNotSupportedException;
-import org.apache.nifi.provenance.serialization.RecordReader;
-import org.apache.nifi.provenance.serialization.RecordReaders;
-import org.apache.nifi.provenance.serialization.RecordWriter;
-import org.apache.nifi.reporting.Severity;
-import org.apache.nifi.stream.io.DataOutputStream;
-import org.apache.nifi.util.file.FileUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.zip.GZIPOutputStream;
-
-import static org.apache.nifi.provenance.TestUtil.createFlowFile;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class TestMiNiFiPersistentProvenanceRepository {
-
- @Rule
- public TestName name = new TestName();
-
- private MiNiFiPersistentProvenanceRepository repo;
- private RepositoryConfiguration config;
-
- public static final int DEFAULT_ROLLOVER_MILLIS = 2000;
- private EventReporter eventReporter;
- private List<ReportedEvent> reportedEvents = Collections.synchronizedList(new ArrayList<ReportedEvent>());
-
- private RepositoryConfiguration createConfiguration() {
- config = new RepositoryConfiguration();
- config.addStorageDirectory(new File("target/storage/" + UUID.randomUUID().toString()));
- config.setCompressOnRollover(true);
- config.setMaxEventFileLife(2000L, TimeUnit.SECONDS);
- config.setCompressionBlockBytes(100);
- return config;
- }
-
- @BeforeClass
- public static void setLogLevel() {
- System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.provenance", "DEBUG");
- }
-
- @Before
- public void printTestName() {
- reportedEvents.clear();
- eventReporter = new EventReporter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void reportEvent(Severity severity, String category, String message) {
- reportedEvents.add(new ReportedEvent(severity, category, message));
- System.out.println(severity + " : " + category + " : " + message);
- }
- };
- }
-
- @After
- public void closeRepo() throws IOException {
- if (repo != null) {
- try {
- repo.close();
- } catch (final IOException ioe) {
- }
- }
-
- // Delete all of the storage files. We do this in order to clean up the tons of files that
- // we create but also to ensure that we have closed all of the file handles. If we leave any
- // streams open, for instance, this will throw an IOException, causing our unit test to fail.
- for (final File storageDir : config.getStorageDirectories()) {
- int i;
- for (i = 0; i < 3; i++) {
- try {
- FileUtils.deleteFile(storageDir, true);
- break;
- } catch (final IOException ioe) {
- // if there is a virus scanner, etc. running in the background we may not be able to
- // delete the file. Wait a sec and try again.
- if (i == 2) {
- throw ioe;
- } else {
- try {
- Thread.sleep(1000L);
- } catch (final InterruptedException ie) {
- }
- }
- }
- }
- }
- }
-
-
-
- private EventReporter getEventReporter() {
- return eventReporter;
- }
-
-
- @Test
- public void testAddAndRecover() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileCapacity(1L);
- config.setMaxEventFileLife(1, TimeUnit.SECONDS);
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null);
-
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("uuid", UUID.randomUUID().toString());
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
- final ProvenanceEventRecord record = builder.build();
-
- for (int i = 0; i < 10; i++) {
- repo.registerEvent(record);
- }
-
- Assert.assertEquals("Did not establish the correct, Max Event Id", 9, repo.getMaxEventId().intValue());
-
- Thread.sleep(1000L);
-
- repo.close();
- Thread.sleep(500L); // Give the repo time to shutdown (i.e., close all file handles, etc.)
-
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null);
- final List<ProvenanceEventRecord> recoveredRecords = repo.getEvents(0L, 12);
-
- Assert.assertEquals("Did not establish the correct, Max Event Id through recovery after reloading", 9, repo.getMaxEventId().intValue());
-
- assertEquals(10, recoveredRecords.size());
- for (int i = 0; i < 10; i++) {
- final ProvenanceEventRecord recovered = recoveredRecords.get(i);
- assertEquals(i, recovered.getEventId());
- assertEquals("nifi://unit-test", recovered.getTransitUri());
- assertEquals(ProvenanceEventType.RECEIVE, recovered.getEventType());
- assertEquals(attributes, recovered.getAttributes());
- }
- }
-
-
- @Test
- public void testCompressOnRollover() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setCompressOnRollover(true);
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- repo.registerEvent(builder.build());
- }
-
- repo.waitForRollover();
- final File storageDir = config.getStorageDirectories().get(0);
- final File compressedLogFile = new File(storageDir, "0.prov.gz");
- assertTrue(compressedLogFile.exists());
- }
-
- @Test(expected = MethodNotSupportedException.class)
- public void testLineageRequestNotSupported() throws IOException, InterruptedException, ParseException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxRecordLife(3, TimeUnit.SECONDS);
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
-
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null);
-
- final String uuid = "00000000-0000-0000-0000-000000000001";
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("uuid", uuid);
- attributes.put("filename", "file-" + uuid);
-
- repo.submitLineageComputation(uuid, null);
- }
-
-
- @Test
- public void testCorrectProvenanceEventIdOnRestore() throws IOException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(1, TimeUnit.SECONDS);
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null);
-
- final String uuid = "00000000-0000-0000-0000-000000000000";
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("abc", "xyz");
- attributes.put("xyz", "abc");
- attributes.put("filename", "file-" + uuid);
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", uuid);
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 10; i++) {
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- repo.registerEvent(builder.build());
- }
-
- repo.close();
-
- final MiNiFiPersistentProvenanceRepository secondRepo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- secondRepo.initialize(getEventReporter(), null, null);
-
- try {
- final ProvenanceEventRecord event11 = builder.build();
- secondRepo.registerEvent(event11);
- secondRepo.waitForRollover();
- final ProvenanceEventRecord event11Retrieved = secondRepo.getEvent(10L);
- assertNotNull(event11Retrieved);
- assertEquals(10, event11Retrieved.getEventId());
- Assert.assertEquals(10, secondRepo.getMaxEventId().intValue());
- } finally {
- secondRepo.close();
- }
- }
-
- /**
- * Here the event file is simply corrupted by virtue of not having any event
- * records while having correct headers
- */
- @Test
- public void testWithWithEventFileMissingRecord() throws Exception {
- File eventFile = this.prepCorruptedEventFileTests();
-
- DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
- in.writeUTF("BlahBlah");
- in.writeInt(4);
- in.close();
- assertTrue(eventFile.exists());
-
- final List<ProvenanceEventRecord> events = repo.getEvents(0, 100);
- assertEquals(10, events.size());
- }
-
- /**
- * Here the event file is simply corrupted by virtue of being empty (0
- * bytes)
- */
- @Test
- public void testWithWithEventFileCorrupted() throws Exception {
- File eventFile = this.prepCorruptedEventFileTests();
-
- DataOutputStream in = new DataOutputStream(new GZIPOutputStream(new FileOutputStream(eventFile)));
- in.close();
-
- final List<ProvenanceEventRecord> events = repo.getEvents(0, 100);
- assertEquals(10, events.size());
- }
-
- private File prepCorruptedEventFileTests() throws Exception {
- RepositoryConfiguration config = createConfiguration();
- config.setMaxStorageCapacity(1024L * 1024L);
- config.setMaxEventFileLife(500, TimeUnit.MILLISECONDS);
- config.setMaxEventFileCapacity(1024L * 1024L);
- config.setSearchableFields(new ArrayList<>(SearchableFields.getStandardFields()));
- config.setDesiredIndexSize(10);
-
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null);
-
- String uuid = UUID.randomUUID().toString();
- for (int i = 0; i < 20; i++) {
- ProvenanceEventRecord record = repo.eventBuilder().fromFlowFile(mock(FlowFile.class))
- .setEventType(ProvenanceEventType.CREATE).setComponentId("foo-" + i).setComponentType("myComponent")
- .setFlowFileUUID(uuid).build();
- repo.registerEvent(record);
- if (i == 9) {
- repo.waitForRollover();
- Thread.sleep(2000L);
- }
- }
- repo.waitForRollover();
- File eventFile = new File(config.getStorageDirectories().get(0), "10.prov.gz");
- assertTrue(eventFile.delete());
- return eventFile;
- }
-
- @Test
- public void testBackPressure() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileCapacity(1L); // force rollover on each record.
- config.setJournalCount(1);
-
- final AtomicInteger journalCountRef = new AtomicInteger(0);
-
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- @Override
- protected int getJournalCount() {
- return journalCountRef.get();
- }
- };
- repo.initialize(getEventReporter(), null, null);
-
- final Map<String, String> attributes = new HashMap<>();
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", UUID.randomUUID().toString());
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- // ensure that we can register the events.
- for (int i = 0; i < 10; i++) {
- builder.fromFlowFile(createFlowFile(i, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + i);
- repo.registerEvent(builder.build());
- }
-
- // set number of journals to 6 so that we will block.
- journalCountRef.set(6);
-
- final AtomicLong threadNanos = new AtomicLong(0L);
- final Thread t = new Thread(new Runnable() {
- @Override
- public void run() {
- final long start = System.nanoTime();
- builder.fromFlowFile(createFlowFile(13, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 13);
- repo.registerEvent(builder.build());
- threadNanos.set(System.nanoTime() - start);
- }
- });
- t.start();
-
- Thread.sleep(1500L);
-
- journalCountRef.set(1);
- t.join();
-
- final int threadMillis = (int) TimeUnit.NANOSECONDS.toMillis(threadNanos.get());
- assertTrue(threadMillis > 1200); // use 1200 to account for the fact that the timing is not exact
-
- builder.fromFlowFile(createFlowFile(15, 3000L, attributes));
- attributes.put("uuid", "00000000-0000-0000-0000-00000000000" + 15);
- repo.registerEvent(builder.build());
- }
-
- @Test
- public void testMergeJournals() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null);
-
- final Map<String, String> attributes = new HashMap<>();
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- final ProvenanceEventRecord record = builder.build();
-
- final ExecutorService exec = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 10000; i++) {
- exec.submit(new Runnable() {
- @Override
- public void run() {
- repo.registerEvent(record);
- }
- });
- }
-
- repo.waitForRollover();
-
- final File storageDir = config.getStorageDirectories().get(0);
- long counter = 0;
- for (final File file : storageDir.listFiles()) {
- if (file.isFile()) {
-
- try (RecordReader reader = RecordReaders.newRecordReader(file, null, 2048)) {
- ProvenanceEventRecord r = null;
-
- while ((r = reader.nextRecord()) != null) {
- assertEquals(counter++, r.getEventId());
- }
- }
- }
- }
-
- assertEquals(10000, counter);
- }
-
- @Test
- public void testTruncateAttributes() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxAttributeChars(50);
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS);
- repo.initialize(getEventReporter(), null, null);
-
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- final ProvenanceEventRecord record = builder.build();
- repo.registerEvent(record);
- repo.waitForRollover();
-
- final ProvenanceEventRecord retrieved = repo.getEvent(0L);
- assertNotNull(retrieved);
- assertEquals("12345678-0000-0000-0000-012345678912", retrieved.getAttributes().get("uuid"));
- assertEquals("12345678901234567890123456789012345678901234567890", retrieved.getAttributes().get("75chars"));
- }
-
-
- @Test
- public void testFailureToCreateWriterDoesNotPreventSubsequentRollover() throws IOException, InterruptedException {
- final RepositoryConfiguration config = createConfiguration();
- config.setMaxAttributeChars(50);
- config.setMaxEventFileLife(3, TimeUnit.SECONDS);
-
- // Create a repo that will allow only a single writer to be created.
- final IOException failure = new IOException("Already created writers once. Unit test causing failure.");
- repo = new MiNiFiPersistentProvenanceRepository(config, DEFAULT_ROLLOVER_MILLIS) {
- int iterations = 0;
-
- @Override
- protected RecordWriter[] createWriters(RepositoryConfiguration config, long initialRecordId) throws IOException {
- if (iterations++ == 1) {
- throw failure;
- } else {
- return super.createWriters(config, initialRecordId);
- }
- }
- };
-
- // initialize with our event reporter
- repo.initialize(getEventReporter(), null, null);
-
- // create some events in the journal files.
- final Map<String, String> attributes = new HashMap<>();
- attributes.put("75chars", "123456789012345678901234567890123456789012345678901234567890123456789012345");
-
- final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder();
- builder.setEventTime(System.currentTimeMillis());
- builder.setEventType(ProvenanceEventType.RECEIVE);
- builder.setTransitUri("nifi://unit-test");
- attributes.put("uuid", "12345678-0000-0000-0000-012345678912");
- builder.fromFlowFile(createFlowFile(3L, 3000L, attributes));
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
-
- for (int i = 0; i < 50; i++) {
- final ProvenanceEventRecord event = builder.build();
- repo.registerEvent(event);
- }
-
- // Attempt to rollover but fail to create new writers.
- try {
- repo.rolloverWithLock(true);
- Assert.fail("Expected to get IOException when calling rolloverWithLock");
- } catch (final IOException ioe) {
- assertTrue(ioe == failure);
- }
-
- // Wait for the first rollover to succeed.
- repo.waitForRollover();
-
- // This time when we rollover, we should not have a problem rolling over.
- repo.rolloverWithLock(true);
-
- // Ensure that no errors were reported.
- assertEquals(0, reportedEvents.size());
- }
-
- private static class ReportedEvent {
- private final Severity severity;
- private final String category;
- private final String message;
-
- public ReportedEvent(final Severity severity, final String category, final String message) {
- this.severity = severity;
- this.category = category;
- this.message = message;
- }
-
- public String getCategory() {
- return category;
- }
-
- public String getMessage() {
- return message;
- }
-
- public Severity getSeverity() {
- return severity;
- }
- }
-}
[4/4] nifi-minifi git commit: MINIFI-279: Makes test classes naming
consistent.
Posted by al...@apache.org.
MINIFI-279: Makes test classes naming consistent.
This closes #82.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi-minifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi-minifi/commit/b059afef
Tree: http://git-wip-us.apache.org/repos/asf/nifi-minifi/tree/b059afef
Diff: http://git-wip-us.apache.org/repos/asf/nifi-minifi/diff/b059afef
Branch: refs/heads/master
Commit: b059afef2fab7e59489836d1bdcac01b3b7100b2
Parents: 25298e9
Author: jzonthemtn <jz...@apache.org>
Authored: Thu Apr 27 10:22:56 2017 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu Apr 27 17:03:07 2017 -0400
----------------------------------------------------------------------
.../ConfigurationChangeCoordinatorTest.java | 84 ++
.../TestConfigurationChangeCoordinator.java | 84 --
.../TestWholeConfigDifferentiator.java | 110 ---
.../WholeConfigDifferentiatorTest.java | 110 +++
.../ingestors/FileChangeIngestorTest.java | 171 ++++
.../PullHttpChangeIngestorSSLTest.java | 84 ++
.../ingestors/PullHttpChangeIngestorTest.java | 65 ++
.../ingestors/RestChangeIngestorSSLTest.java | 150 ++++
.../ingestors/RestChangeIngestorTest.java | 57 ++
.../ingestors/TestFileChangeIngestor.java | 171 ----
.../ingestors/TestPullHttpChangeIngestor.java | 65 --
.../TestPullHttpChangeIngestorSSL.java | 84 --
.../ingestors/TestRestChangeIngestor.java | 57 --
.../ingestors/TestRestChangeIngestorSSL.java | 150 ----
.../PullHttpChangeIngestorCommonTest.java | 231 +++++
.../common/RestChangeIngestorCommonTest.java | 127 +++
.../TestPullHttpChangeIngestorCommon.java | 231 -----
.../common/TestRestChangeIngestorCommon.java | 127 ---
.../status/reporters/StatusLoggerTest.java | 209 +++++
.../status/reporters/TestStatusLogger.java | 209 -----
.../bootstrap/util/ConfigTransformerTest.java | 249 +++++-
.../bootstrap/util/TestConfigTransformer.java | 221 -----
.../minifi/commons/status/StatusReportTest.java | 88 ++
.../minifi/commons/status/TestStatusReport.java | 88 --
.../minifi/status/StatusConfigReporterTest.java | 875 +++++++++++++++++++
.../minifi/status/TestStatusConfigReporter.java | 875 -------------------
...iNiFiPersistentProvenanceRepositoryTest.java | 579 ++++++++++++
...estMiNiFiPersistentProvenanceRepository.java | 579 ------------
28 files changed, 3051 insertions(+), 3079 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinatorTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinatorTest.java
new file mode 100644
index 0000000..ad16f72
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ConfigurationChangeCoordinatorTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.mockito.Mockito.verify;
+
+public class ConfigurationChangeCoordinatorTest {
+
+ private ConfigurationChangeCoordinator coordinatorSpy;
+ private Properties properties = new Properties();
+
+ @Before
+ public void setUp() throws Exception {
+ coordinatorSpy = Mockito.spy(new ConfigurationChangeCoordinator());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ coordinatorSpy.close();
+ }
+
+ @Test
+ public void testInit() throws Exception {
+ properties.put("nifi.minifi.notifier.ingestors", "org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor");
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+ coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(testListener));
+ }
+
+ @Test
+ public void testNotifyListeners() throws Exception {
+ final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+ coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(testListener));
+
+ Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
+
+ coordinatorSpy.notifyListeners(ByteBuffer.allocate(1));
+
+ verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
+ }
+
+ @Test
+ public void testRegisterListener() throws Exception {
+ final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
+ coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(firstListener));
+
+ Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
+
+ coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Arrays.asList(firstListener, firstListener));
+ Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
+
+ final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
+ coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Arrays.asList(firstListener, secondListener));
+ Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 2);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestConfigurationChangeCoordinator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestConfigurationChangeCoordinator.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestConfigurationChangeCoordinator.java
deleted file mode 100644
index a6882a5..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/TestConfigurationChangeCoordinator.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration;
-
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Properties;
-
-import static org.mockito.Mockito.verify;
-
-public class TestConfigurationChangeCoordinator {
-
- private ConfigurationChangeCoordinator coordinatorSpy;
- private Properties properties = new Properties();
-
- @Before
- public void setUp() throws Exception {
- coordinatorSpy = Mockito.spy(new ConfigurationChangeCoordinator());
- }
-
- @After
- public void tearDown() throws Exception {
- coordinatorSpy.close();
- }
-
- @Test
- public void testInit() throws Exception {
- properties.put("nifi.minifi.notifier.ingestors", "org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor");
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
- coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(testListener));
- }
-
- @Test
- public void testNotifyListeners() throws Exception {
- final ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
- coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(testListener));
-
- Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
-
- coordinatorSpy.notifyListeners(ByteBuffer.allocate(1));
-
- verify(testListener, Mockito.atMost(1)).handleChange(Mockito.any(InputStream.class));
- }
-
- @Test
- public void testRegisterListener() throws Exception {
- final ConfigurationChangeListener firstListener = Mockito.mock(ConfigurationChangeListener.class);
- coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Collections.singleton(firstListener));
-
- Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
-
- coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Arrays.asList(firstListener, firstListener));
- Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 1);
-
- final ConfigurationChangeListener secondListener = Mockito.mock(ConfigurationChangeListener.class);
- coordinatorSpy.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), Arrays.asList(firstListener, secondListener));
- Assert.assertEquals("Did not receive the correct number of registered listeners", coordinatorSpy.getChangeListeners().size(), 2);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/TestWholeConfigDifferentiator.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/TestWholeConfigDifferentiator.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/TestWholeConfigDifferentiator.java
deleted file mode 100644
index 9dabea3..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/TestWholeConfigDifferentiator.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
-
-import okhttp3.Request;
-import org.apache.commons.io.FileUtils;
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-public class TestWholeConfigDifferentiator {
-
- public static final Path newConfigPath = Paths.get("./src/test/resources/config.yml");
- public static final Path defaultConfigPath = Paths.get("./src/test/resources/default.yml");
-
- public static ByteBuffer defaultConfigBuffer;
- public static ByteBuffer newConfigBuffer;
- public static Properties properties = new Properties();
- public static ConfigurationFileHolder configurationFileHolder;
-
- public static Request dummyRequest;
-
- @BeforeClass
- public static void beforeClass() throws IOException {
- dummyRequest = new Request.Builder()
- .get()
- .url("https://nifi.apache.org/index.html")
- .build();
-
- defaultConfigBuffer = ByteBuffer.wrap(FileUtils.readFileToByteArray(defaultConfigPath.toFile()));
- newConfigBuffer = ByteBuffer.wrap(FileUtils.readFileToByteArray(newConfigPath.toFile()));
-
- configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
-
- when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(defaultConfigBuffer));
- }
-
- @Before
- public void beforeEach() {
- }
-
- // InputStream differentiator methods
-
- @Test
- public void TestSameInputStream() throws IOException {
- Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
- differentiator.initialize(properties, configurationFileHolder);
-
- FileInputStream fileInputStream = new FileInputStream(defaultConfigPath.toFile());
- assertFalse(differentiator.isNew(fileInputStream));
- }
-
- @Test
- public void TestNewInputStream() throws IOException {
- Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
- differentiator.initialize(properties, configurationFileHolder);
-
- FileInputStream fileInputStream = new FileInputStream(newConfigPath.toFile());
- assertTrue(differentiator.isNew(fileInputStream));
- }
-
- // Bytebuffer differentiator methods
-
- @Test
- public void TestSameByteBuffer() throws IOException {
- Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
- differentiator.initialize(properties, configurationFileHolder);
-
- assertFalse(differentiator.isNew(defaultConfigBuffer));
- }
-
- @Test
- public void TestNewByteBuffer() throws IOException {
- Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
- differentiator.initialize(properties, configurationFileHolder);
-
- assertTrue(differentiator.isNew(newConfigBuffer));
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
new file mode 100644
index 0000000..d2115a2
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/differentiators/WholeConfigDifferentiatorTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.differentiators;
+
+import okhttp3.Request;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+public class WholeConfigDifferentiatorTest {
+
+ public static final Path newConfigPath = Paths.get("./src/test/resources/config.yml");
+ public static final Path defaultConfigPath = Paths.get("./src/test/resources/default.yml");
+
+ public static ByteBuffer defaultConfigBuffer;
+ public static ByteBuffer newConfigBuffer;
+ public static Properties properties = new Properties();
+ public static ConfigurationFileHolder configurationFileHolder;
+
+ public static Request dummyRequest;
+
+ @BeforeClass
+ public static void beforeClass() throws IOException {
+ dummyRequest = new Request.Builder()
+ .get()
+ .url("https://nifi.apache.org/index.html")
+ .build();
+
+ defaultConfigBuffer = ByteBuffer.wrap(FileUtils.readFileToByteArray(defaultConfigPath.toFile()));
+ newConfigBuffer = ByteBuffer.wrap(FileUtils.readFileToByteArray(newConfigPath.toFile()));
+
+ configurationFileHolder = Mockito.mock(ConfigurationFileHolder.class);
+
+ when(configurationFileHolder.getConfigFileReference()).thenReturn(new AtomicReference<>(defaultConfigBuffer));
+ }
+
+ @Before
+ public void beforeEach() {
+ }
+
+ // InputStream differentiator methods
+
+ @Test
+ public void TestSameInputStream() throws IOException {
+ Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+ differentiator.initialize(properties, configurationFileHolder);
+
+ FileInputStream fileInputStream = new FileInputStream(defaultConfigPath.toFile());
+ assertFalse(differentiator.isNew(fileInputStream));
+ }
+
+ @Test
+ public void TestNewInputStream() throws IOException {
+ Differentiator<InputStream> differentiator = WholeConfigDifferentiator.getInputStreamDifferentiator();
+ differentiator.initialize(properties, configurationFileHolder);
+
+ FileInputStream fileInputStream = new FileInputStream(newConfigPath.toFile());
+ assertTrue(differentiator.isNew(fileInputStream));
+ }
+
+ // Bytebuffer differentiator methods
+
+ @Test
+ public void TestSameByteBuffer() throws IOException {
+ Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+ differentiator.initialize(properties, configurationFileHolder);
+
+ assertFalse(differentiator.isNew(defaultConfigBuffer));
+ }
+
+ @Test
+ public void TestNewByteBuffer() throws IOException {
+ Differentiator<ByteBuffer> differentiator = WholeConfigDifferentiator.getByteBufferDifferentiator();
+ differentiator.initialize(properties, configurationFileHolder);
+
+ assertTrue(differentiator.isNew(newConfigBuffer));
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestorTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestorTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestorTest.java
new file mode 100644
index 0000000..7cac488
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/FileChangeIngestorTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class FileChangeIngestorTest {
+
+ private static final String CONFIG_FILENAME = "config.yml";
+ private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
+
+ private FileChangeIngestor notifierSpy;
+ private WatchService mockWatchService;
+ private Properties testProperties;
+ private Differentiator<InputStream> mockDifferentiator;
+ private ConfigurationChangeNotifier testNotifier;
+
+ @Before
+ public void setUp() throws Exception {
+ mockWatchService = Mockito.mock(WatchService.class);
+ notifierSpy = Mockito.spy(new FileChangeIngestor());
+ mockDifferentiator = Mockito.mock(Differentiator.class);
+ testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+
+ notifierSpy.setConfigFilePath(Paths.get(TEST_CONFIG_PATH));
+ notifierSpy.setWatchService(mockWatchService);
+ notifierSpy.setDifferentiator(mockDifferentiator);
+ notifierSpy.setConfigurationChangeNotifier(testNotifier);
+
+ testProperties = new Properties();
+ testProperties.put(FileChangeIngestor.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
+ testProperties.put(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY, FileChangeIngestor.DEFAULT_POLLING_PERIOD_INTERVAL);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ notifierSpy.close();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testInitialize_invalidFile() throws Exception {
+ testProperties.put(FileChangeIngestor.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
+ notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
+ }
+
+ @Test
+ public void testInitialize_validFile() throws Exception {
+ notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testInitialize_invalidPollingPeriod() throws Exception {
+ testProperties.put(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY, "abc");
+ notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
+ }
+
+ @Test
+ public void testInitialize_useDefaultPolling() throws Exception {
+ testProperties.remove(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY);
+ notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
+ }
+
+ /* Verify handleChange events */
+ @Test
+ public void testTargetChangedNoModification() throws Exception {
+ when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+ final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+
+ // In this case the WatchKey is null because there were no events found
+ establishMockEnvironmentForChangeTests(testNotifier, null);
+
+ verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
+ }
+
+ @Test
+ public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
+ when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+ final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+
+ // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
+ final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
+
+ establishMockEnvironmentForChangeTests(testNotifier, mockWatchKey);
+
+ notifierSpy.targetChanged();
+
+ verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
+ }
+
+ @Test
+ public void testTargetChangedWithModificationEvent() throws Exception {
+ when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
+
+ final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
+ // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
+ establishMockEnvironmentForChangeTests(testNotifier, mockWatchKey);
+
+ // Invoke the method of interest
+ notifierSpy.run();
+
+ verify(mockWatchService, Mockito.atLeastOnce()).poll();
+ verify(testNotifier, Mockito.atLeastOnce()).notifyListeners(Mockito.any(ByteBuffer.class));
+ }
+
+ /* Helper methods to establish mock environment */
+ private WatchKey createMockWatchKeyForPath(String configFilePath) {
+ final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
+ final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
+ when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
+ when(mockWatchKey.reset()).thenReturn(true);
+
+ final Iterator mockIterator = Mockito.mock(Iterator.class);
+ when(mockWatchEvents.iterator()).thenReturn(mockIterator);
+
+ final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
+ when(mockIterator.hasNext()).thenReturn(true, false);
+ when(mockIterator.next()).thenReturn(mockWatchEvent);
+
+ // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
+ when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
+ when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
+
+ return mockWatchKey;
+ }
+
+ private void establishMockEnvironmentForChangeTests(ConfigurationChangeNotifier configurationChangeNotifier, final WatchKey watchKey) throws Exception {
+ // Establish the file mock and its parent directory
+ final Path mockConfigFilePath = Mockito.mock(Path.class);
+ final Path mockConfigFileParentPath = Mockito.mock(Path.class);
+
+ // When getting the parent of the file, get the directory
+ when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
+
+ when(mockWatchService.poll()).thenReturn(watchKey);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
new file mode 100644
index 0000000..195cf60
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorSSLTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.PullHttpChangeIngestorCommonTest;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import java.util.Properties;
+
+public class PullHttpChangeIngestorSSLTest extends PullHttpChangeIngestorCommonTest {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ PullHttpChangeIngestorCommonTest.init();
+
+ SslContextFactory ssl = new SslContextFactory();
+
+ ssl.setKeyStorePath("./src/test/resources/localhost-ks.jks");
+ ssl.setKeyStorePassword("localtest");
+ ssl.setKeyStoreType("JKS");
+ ssl.setTrustStorePath("./src/test/resources/localhost-ts.jks");
+ ssl.setTrustStorePassword("localtest");
+ ssl.setTrustStoreType("JKS");
+ ssl.setNeedClientAuth(true);
+
+ // build the connector
+ final ServerConnector https = new ServerConnector(jetty, ssl);
+
+ // set host and port
+ https.setPort(0);
+ https.setHost("localhost");
+
+ // Severely taxed environments may have significant delays when executing.
+ https.setIdleTimeout(30000L);
+
+ // add the connector
+ jetty.addConnector(https);
+
+ jetty.start();
+
+ Thread.sleep(1000);
+
+ if (!jetty.isStarted()) {
+ throw new IllegalStateException("Jetty server not started");
+ }
+ }
+
+ @Override
+ public void pullHttpChangeIngestorInit(Properties properties) {
+ properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
+ properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_PASSWORD_KEY, "localtest");
+ properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_TYPE_KEY, "JKS");
+ properties.setProperty(PullHttpChangeIngestor.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
+ properties.setProperty(PullHttpChangeIngestor.KEYSTORE_PASSWORD_KEY, "localtest");
+ properties.setProperty(PullHttpChangeIngestor.KEYSTORE_TYPE_KEY, "JKS");
+ port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+ properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
+ properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
+
+ pullHttpChangeIngestor = new PullHttpChangeIngestor();
+
+ pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+ pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
new file mode 100644
index 0000000..f39fbdf
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/PullHttpChangeIngestorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.PullHttpChangeIngestorCommonTest;
+import org.eclipse.jetty.server.ServerConnector;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import java.util.Properties;
+
+public class PullHttpChangeIngestorTest extends PullHttpChangeIngestorCommonTest {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ PullHttpChangeIngestorCommonTest.init();
+
+ final ServerConnector http = new ServerConnector(jetty);
+
+ http.setPort(0);
+ http.setHost("localhost");
+
+ http.setIdleTimeout(3000L);
+ jetty.addConnector(http);
+
+ jetty.start();
+
+ Thread.sleep(1000);
+
+ if (!jetty.isStarted()) {
+ throw new IllegalStateException("Jetty server not started");
+ }
+ }
+
+
+ @Override
+ public void pullHttpChangeIngestorInit(Properties properties) {
+ port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
+ properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
+ properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
+ properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000");
+
+ pullHttpChangeIngestor = new PullHttpChangeIngestor();
+
+
+ pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+ pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorSSLTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorSSLTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorSSLTest.java
new file mode 100644
index 0000000..dada8a5
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorSSLTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+
+import okhttp3.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.RestChangeIngestorCommonTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.UnrecoverableKeyException;
+import java.security.cert.CertificateException;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.mockito.Mockito.when;
+
+
+public class RestChangeIngestorSSLTest extends RestChangeIngestorCommonTest {
+
+
+ @BeforeClass
+ public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
+ Properties properties = new Properties();
+ properties.setProperty(RestChangeIngestor.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
+ properties.setProperty(RestChangeIngestor.TRUSTSTORE_PASSWORD_KEY, "localtest");
+ properties.setProperty(RestChangeIngestor.TRUSTSTORE_TYPE_KEY, "JKS");
+ properties.setProperty(RestChangeIngestor.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
+ properties.setProperty(RestChangeIngestor.KEYSTORE_PASSWORD_KEY, "localtest");
+ properties.setProperty(RestChangeIngestor.KEYSTORE_TYPE_KEY, "JKS");
+ properties.setProperty(RestChangeIngestor.NEED_CLIENT_AUTH_KEY, "false");
+
+ restChangeIngestor = new RestChangeIngestor();
+
+ testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+ ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+ when(testListener.getDescriptor()).thenReturn("MockChangeListener");
+ when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
+
+ restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+ restChangeIngestor.setDifferentiator(mockDifferentiator);
+ restChangeIngestor.start();
+
+ OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
+
+ final String keystoreLocation = "./src/test/resources/localhost-ks.jks";
+ final String keystorePass = "localtest";
+ final String keystoreType = "JKS";
+
+ // prepare the keystore
+ final KeyStore keyStore = KeyStore.getInstance(keystoreType);
+
+ try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
+ keyStore.load(keyStoreStream, keystorePass.toCharArray());
+ }
+
+ final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ keyManagerFactory.init(keyStore, keystorePass.toCharArray());
+
+ // load truststore
+ final String truststoreLocation = "./src/test/resources/localhost-ts.jks";
+ final String truststorePass = "localtest";
+ final String truststoreType = "JKS";
+
+ KeyStore truststore = KeyStore.getInstance(truststoreType);
+ final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
+ truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
+ trustManagerFactory.init(truststore);
+
+ final X509TrustManager x509TrustManager;
+ TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
+ if (trustManagers[0] != null) {
+ x509TrustManager = (X509TrustManager) trustManagers[0];
+ } else {
+ throw new IllegalStateException("List of trust managers is null");
+ }
+
+ SSLContext tempSslContext;
+ try {
+ tempSslContext = SSLContext.getInstance("TLS");
+ } catch (NoSuchAlgorithmException e) {
+ tempSslContext = SSLContext.getDefault();
+ }
+
+ final SSLContext sslContext = tempSslContext;
+ sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
+
+ final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ clientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
+
+ Thread.sleep(1000);
+ url = restChangeIngestor.getURI().toURL().toString();
+ client = clientBuilder.build();
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ restChangeIngestor.close();
+ client = null;
+ }
+
+ private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
+ KeyStore ks = KeyStore.getInstance("jks");
+
+ char[] password = "localtest".toCharArray();
+
+ java.io.FileInputStream fis = null;
+ try {
+ fis = new java.io.FileInputStream(path);
+ ks.load(fis, password);
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ return ks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorTest.java
new file mode 100644
index 0000000..5af2d33
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/RestChangeIngestorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
+
+
+import okhttp3.OkHttpClient;
+import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.RestChangeIngestorCommonTest;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.mockito.Mockito;
+
+import java.net.MalformedURLException;
+import java.util.Properties;
+
+
+public class RestChangeIngestorTest extends RestChangeIngestorCommonTest {
+
+ @BeforeClass
+ public static void setUp() throws InterruptedException, MalformedURLException {
+ Properties properties = new Properties();
+ restChangeIngestor = new RestChangeIngestor();
+
+ testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+
+ restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
+ restChangeIngestor.setDifferentiator(mockDifferentiator);
+ restChangeIngestor.start();
+
+ client = new OkHttpClient();
+
+ url = restChangeIngestor.getURI().toURL().toString();
+ Thread.sleep(1000);
+ }
+
+ @AfterClass
+ public static void stop() throws Exception {
+ restChangeIngestor.close();
+ client = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestFileChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestFileChangeIngestor.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestFileChangeIngestor.java
deleted file mode 100644
index 9817e7e..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestFileChangeIngestor.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
-
-import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-public class TestFileChangeIngestor {
-
- private static final String CONFIG_FILENAME = "config.yml";
- private static final String TEST_CONFIG_PATH = "src/test/resources/config.yml";
-
- private FileChangeIngestor notifierSpy;
- private WatchService mockWatchService;
- private Properties testProperties;
- private Differentiator<InputStream> mockDifferentiator;
- private ConfigurationChangeNotifier testNotifier;
-
- @Before
- public void setUp() throws Exception {
- mockWatchService = Mockito.mock(WatchService.class);
- notifierSpy = Mockito.spy(new FileChangeIngestor());
- mockDifferentiator = Mockito.mock(Differentiator.class);
- testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
-
- notifierSpy.setConfigFilePath(Paths.get(TEST_CONFIG_PATH));
- notifierSpy.setWatchService(mockWatchService);
- notifierSpy.setDifferentiator(mockDifferentiator);
- notifierSpy.setConfigurationChangeNotifier(testNotifier);
-
- testProperties = new Properties();
- testProperties.put(FileChangeIngestor.CONFIG_FILE_PATH_KEY, TEST_CONFIG_PATH);
- testProperties.put(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY, FileChangeIngestor.DEFAULT_POLLING_PERIOD_INTERVAL);
- }
-
- @After
- public void tearDown() throws Exception {
- notifierSpy.close();
- }
-
- @Test(expected = IllegalStateException.class)
- public void testInitialize_invalidFile() throws Exception {
- testProperties.put(FileChangeIngestor.CONFIG_FILE_PATH_KEY, "/land/of/make/believe");
- notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
- }
-
- @Test
- public void testInitialize_validFile() throws Exception {
- notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
- }
-
- @Test(expected = IllegalStateException.class)
- public void testInitialize_invalidPollingPeriod() throws Exception {
- testProperties.put(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY, "abc");
- notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
- }
-
- @Test
- public void testInitialize_useDefaultPolling() throws Exception {
- testProperties.remove(FileChangeIngestor.POLLING_PERIOD_INTERVAL_KEY);
- notifierSpy.initialize(testProperties, Mockito.mock(ConfigurationFileHolder.class), Mockito.mock(ConfigurationChangeNotifier.class));
- }
-
- /* Verify handleChange events */
- @Test
- public void testTargetChangedNoModification() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
- final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
-
- // In this case the WatchKey is null because there were no events found
- establishMockEnvironmentForChangeTests(testNotifier, null);
-
- verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
- }
-
- @Test
- public void testTargetChangedWithModificationEvent_nonConfigFile() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
- final ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
-
- // In this case, we receive a trigger event for the directory monitored, but it was another file not being monitored
- final WatchKey mockWatchKey = createMockWatchKeyForPath("footage_not_found.yml");
-
- establishMockEnvironmentForChangeTests(testNotifier, mockWatchKey);
-
- notifierSpy.targetChanged();
-
- verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
- }
-
- @Test
- public void testTargetChangedWithModificationEvent() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
-
- final WatchKey mockWatchKey = createMockWatchKeyForPath(CONFIG_FILENAME);
- // Provided as a spy to allow injection of mock objects for some tests when dealing with the finalized FileSystems class
- establishMockEnvironmentForChangeTests(testNotifier, mockWatchKey);
-
- // Invoke the method of interest
- notifierSpy.run();
-
- verify(mockWatchService, Mockito.atLeastOnce()).poll();
- verify(testNotifier, Mockito.atLeastOnce()).notifyListeners(Mockito.any(ByteBuffer.class));
- }
-
- /* Helper methods to establish mock environment */
- private WatchKey createMockWatchKeyForPath(String configFilePath) {
- final WatchKey mockWatchKey = Mockito.mock(WatchKey.class);
- final List<WatchEvent<?>> mockWatchEvents = (List<WatchEvent<?>>) Mockito.mock(List.class);
- when(mockWatchKey.pollEvents()).thenReturn(mockWatchEvents);
- when(mockWatchKey.reset()).thenReturn(true);
-
- final Iterator mockIterator = Mockito.mock(Iterator.class);
- when(mockWatchEvents.iterator()).thenReturn(mockIterator);
-
- final WatchEvent mockWatchEvent = Mockito.mock(WatchEvent.class);
- when(mockIterator.hasNext()).thenReturn(true, false);
- when(mockIterator.next()).thenReturn(mockWatchEvent);
-
- // In this case, we receive a trigger event for the directory monitored, and it was the file monitored
- when(mockWatchEvent.context()).thenReturn(Paths.get(configFilePath));
- when(mockWatchEvent.kind()).thenReturn(ENTRY_MODIFY);
-
- return mockWatchKey;
- }
-
- private void establishMockEnvironmentForChangeTests(ConfigurationChangeNotifier configurationChangeNotifier, final WatchKey watchKey) throws Exception {
- // Establish the file mock and its parent directory
- final Path mockConfigFilePath = Mockito.mock(Path.class);
- final Path mockConfigFileParentPath = Mockito.mock(Path.class);
-
- // When getting the parent of the file, get the directory
- when(mockConfigFilePath.getParent()).thenReturn(mockConfigFileParentPath);
-
- when(mockWatchService.poll()).thenReturn(watchKey);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestor.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestor.java
deleted file mode 100644
index f5c6806..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestor.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
-
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.TestPullHttpChangeIngestorCommon;
-import org.eclipse.jetty.server.ServerConnector;
-import org.junit.BeforeClass;
-import org.mockito.Mockito;
-
-import java.util.Properties;
-
-public class TestPullHttpChangeIngestor extends TestPullHttpChangeIngestorCommon {
-
- @BeforeClass
- public static void setUp() throws Exception {
- TestPullHttpChangeIngestorCommon.init();
-
- final ServerConnector http = new ServerConnector(jetty);
-
- http.setPort(0);
- http.setHost("localhost");
-
- http.setIdleTimeout(3000L);
- jetty.addConnector(http);
-
- jetty.start();
-
- Thread.sleep(1000);
-
- if (!jetty.isStarted()) {
- throw new IllegalStateException("Jetty server not started");
- }
- }
-
-
- @Override
- public void pullHttpChangeIngestorInit(Properties properties) {
- port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
- properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
- properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
- properties.put(PullHttpChangeIngestor.PULL_HTTP_POLLING_PERIOD_KEY, "30000");
-
- pullHttpChangeIngestor = new PullHttpChangeIngestor();
-
-
- pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
- pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestorSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestorSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestorSSL.java
deleted file mode 100644
index 340a131..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestPullHttpChangeIngestorSSL.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
-
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.TestPullHttpChangeIngestorCommon;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.junit.BeforeClass;
-import org.mockito.Mockito;
-
-import java.util.Properties;
-
-public class TestPullHttpChangeIngestorSSL extends TestPullHttpChangeIngestorCommon {
-
- @BeforeClass
- public static void setUp() throws Exception {
- TestPullHttpChangeIngestorCommon.init();
-
- SslContextFactory ssl = new SslContextFactory();
-
- ssl.setKeyStorePath("./src/test/resources/localhost-ks.jks");
- ssl.setKeyStorePassword("localtest");
- ssl.setKeyStoreType("JKS");
- ssl.setTrustStorePath("./src/test/resources/localhost-ts.jks");
- ssl.setTrustStorePassword("localtest");
- ssl.setTrustStoreType("JKS");
- ssl.setNeedClientAuth(true);
-
- // build the connector
- final ServerConnector https = new ServerConnector(jetty, ssl);
-
- // set host and port
- https.setPort(0);
- https.setHost("localhost");
-
- // Severely taxed environments may have significant delays when executing.
- https.setIdleTimeout(30000L);
-
- // add the connector
- jetty.addConnector(https);
-
- jetty.start();
-
- Thread.sleep(1000);
-
- if (!jetty.isStarted()) {
- throw new IllegalStateException("Jetty server not started");
- }
- }
-
- @Override
- public void pullHttpChangeIngestorInit(Properties properties) {
- properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
- properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_PASSWORD_KEY, "localtest");
- properties.setProperty(PullHttpChangeIngestor.TRUSTSTORE_TYPE_KEY, "JKS");
- properties.setProperty(PullHttpChangeIngestor.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
- properties.setProperty(PullHttpChangeIngestor.KEYSTORE_PASSWORD_KEY, "localtest");
- properties.setProperty(PullHttpChangeIngestor.KEYSTORE_TYPE_KEY, "JKS");
- port = ((ServerConnector) jetty.getConnectors()[0]).getLocalPort();
- properties.put(PullHttpChangeIngestor.PORT_KEY, String.valueOf(port));
- properties.put(PullHttpChangeIngestor.HOST_KEY, "localhost");
-
- pullHttpChangeIngestor = new PullHttpChangeIngestor();
-
- pullHttpChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
- pullHttpChangeIngestor.setDifferentiator(mockDifferentiator);
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestor.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestor.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestor.java
deleted file mode 100644
index 86a6b4e..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
-
-
-import okhttp3.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.TestRestChangeIngestorCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.mockito.Mockito;
-
-import java.net.MalformedURLException;
-import java.util.Properties;
-
-
-public class TestRestChangeIngestor extends TestRestChangeIngestorCommon {
-
- @BeforeClass
- public static void setUp() throws InterruptedException, MalformedURLException {
- Properties properties = new Properties();
- restChangeIngestor = new RestChangeIngestor();
-
- testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
-
- restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
- restChangeIngestor.setDifferentiator(mockDifferentiator);
- restChangeIngestor.start();
-
- client = new OkHttpClient();
-
- url = restChangeIngestor.getURI().toURL().toString();
- Thread.sleep(1000);
- }
-
- @AfterClass
- public static void stop() throws Exception {
- restChangeIngestor.close();
- client = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestorSSL.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestorSSL.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestorSSL.java
deleted file mode 100644
index debe772..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/TestRestChangeIngestorSSL.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.ingestors;
-
-
-import okhttp3.OkHttpClient;
-import org.apache.nifi.minifi.bootstrap.ConfigurationFileHolder;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
-import org.apache.nifi.minifi.bootstrap.configuration.ingestors.common.TestRestChangeIngestorCommon;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.mockito.Mockito;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509TrustManager;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.KeyManagementException;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Collections;
-import java.util.Properties;
-
-import static org.mockito.Mockito.when;
-
-
-public class TestRestChangeIngestorSSL extends TestRestChangeIngestorCommon {
-
-
- @BeforeClass
- public static void setUpHttps() throws CertificateException, NoSuchAlgorithmException, KeyStoreException, IOException, UnrecoverableKeyException, KeyManagementException, InterruptedException {
- Properties properties = new Properties();
- properties.setProperty(RestChangeIngestor.TRUSTSTORE_LOCATION_KEY, "./src/test/resources/localhost-ts.jks");
- properties.setProperty(RestChangeIngestor.TRUSTSTORE_PASSWORD_KEY, "localtest");
- properties.setProperty(RestChangeIngestor.TRUSTSTORE_TYPE_KEY, "JKS");
- properties.setProperty(RestChangeIngestor.KEYSTORE_LOCATION_KEY, "./src/test/resources/localhost-ks.jks");
- properties.setProperty(RestChangeIngestor.KEYSTORE_PASSWORD_KEY, "localtest");
- properties.setProperty(RestChangeIngestor.KEYSTORE_TYPE_KEY, "JKS");
- properties.setProperty(RestChangeIngestor.NEED_CLIENT_AUTH_KEY, "false");
-
- restChangeIngestor = new RestChangeIngestor();
-
- testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
- ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
- when(testListener.getDescriptor()).thenReturn("MockChangeListener");
- when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
-
- restChangeIngestor.initialize(properties, Mockito.mock(ConfigurationFileHolder.class), testNotifier);
- restChangeIngestor.setDifferentiator(mockDifferentiator);
- restChangeIngestor.start();
-
- OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder();
-
- final String keystoreLocation = "./src/test/resources/localhost-ks.jks";
- final String keystorePass = "localtest";
- final String keystoreType = "JKS";
-
- // prepare the keystore
- final KeyStore keyStore = KeyStore.getInstance(keystoreType);
-
- try (FileInputStream keyStoreStream = new FileInputStream(keystoreLocation)) {
- keyStore.load(keyStoreStream, keystorePass.toCharArray());
- }
-
- final KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- keyManagerFactory.init(keyStore, keystorePass.toCharArray());
-
- // load truststore
- final String truststoreLocation = "./src/test/resources/localhost-ts.jks";
- final String truststorePass = "localtest";
- final String truststoreType = "JKS";
-
- KeyStore truststore = KeyStore.getInstance(truststoreType);
- final TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance("X509");
- truststore.load(new FileInputStream(truststoreLocation), truststorePass.toCharArray());
- trustManagerFactory.init(truststore);
-
- final X509TrustManager x509TrustManager;
- TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
- if (trustManagers[0] != null) {
- x509TrustManager = (X509TrustManager) trustManagers[0];
- } else {
- throw new IllegalStateException("List of trust managers is null");
- }
-
- SSLContext tempSslContext;
- try {
- tempSslContext = SSLContext.getInstance("TLS");
- } catch (NoSuchAlgorithmException e) {
- tempSslContext = SSLContext.getDefault();
- }
-
- final SSLContext sslContext = tempSslContext;
- sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
-
- final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
- clientBuilder.sslSocketFactory(sslSocketFactory, x509TrustManager);
-
- Thread.sleep(1000);
- url = restChangeIngestor.getURI().toURL().toString();
- client = clientBuilder.build();
- }
-
- @AfterClass
- public static void stop() throws Exception {
- restChangeIngestor.close();
- client = null;
- }
-
- private static KeyStore readKeyStore(String path) throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
- KeyStore ks = KeyStore.getInstance("jks");
-
- char[] password = "localtest".toCharArray();
-
- java.io.FileInputStream fis = null;
- try {
- fis = new java.io.FileInputStream(path);
- ks.load(fis, password);
- } finally {
- if (fis != null) {
- fis.close();
- }
- }
- return ks;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
new file mode 100644
index 0000000..229e33d
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/PullHttpChangeIngestorCommonTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
+
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor;
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+import org.eclipse.jetty.server.handler.HandlerCollection;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.Properties;
+
+import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public abstract class PullHttpChangeIngestorCommonTest {
+
+ public static volatile Server jetty;
+ public static volatile int port;
+ public static volatile PullHttpChangeIngestor pullHttpChangeIngestor;
+ public static ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
+ public static Differentiator<ByteBuffer> mockDifferentiator = Mockito.mock(Differentiator.class);
+ public static final String RESPONSE_STRING = "test";
+ public static final String PATH_RESPONSE_STRING = "path";
+ public static ByteBuffer configBuffer= ByteBuffer.wrap(RESPONSE_STRING.getBytes());
+ public static ByteBuffer pathConfigBuffer= ByteBuffer.wrap(PATH_RESPONSE_STRING.getBytes());
+ public static final String ETAG = "testEtag";
+ public static final String QUOTED_ETAG = "\"testEtag\"";
+
+ public static void init() {
+ QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
+ queuedThreadPool.setDaemon(true);
+ jetty = new Server(queuedThreadPool);
+
+ HandlerCollection handlerCollection = new HandlerCollection(true);
+ handlerCollection.addHandler(new JettyHandler(RESPONSE_STRING, PATH_RESPONSE_STRING));
+ jetty.setHandler(handlerCollection);
+ }
+
+ public abstract void pullHttpChangeIngestorInit(Properties properties);
+
+ @Before
+ public void before() {
+ Mockito.reset(testNotifier);
+ ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+ when(testListener.getDescriptor()).thenReturn("MockChangeListener");
+ Mockito.when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
+ }
+
+ @AfterClass
+ public static void shutdown() throws Exception {
+ jetty.stop();
+ }
+
+ @Test
+ public void testNewUpdate() throws IOException {
+ Properties properties = new Properties();
+ pullHttpChangeIngestorInit(properties);
+ pullHttpChangeIngestor.setUseEtag(false);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+ pullHttpChangeIngestor.run();
+
+ verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer.asReadOnlyBuffer()));
+ }
+
+
+ @Test
+ public void testNoUpdate() throws IOException {
+ Properties properties = new Properties();
+ pullHttpChangeIngestorInit(properties);
+ pullHttpChangeIngestor.setUseEtag(false);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
+
+ pullHttpChangeIngestor.run();
+
+ verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
+ }
+
+ @Test
+ public void testUseEtag() throws IOException {
+ Properties properties = new Properties();
+ pullHttpChangeIngestorInit(properties);
+ pullHttpChangeIngestor.setLastEtag("");
+
+ pullHttpChangeIngestor.setUseEtag(true);
+
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+ pullHttpChangeIngestor.run();
+
+ verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer));
+
+ pullHttpChangeIngestor.run();
+
+ verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.any());
+
+ }
+
+ @Test
+ public void testNewUpdateWithPath() throws IOException {
+ Properties properties = new Properties();
+ properties.put(PATH_KEY, "/config.yml");
+ pullHttpChangeIngestorInit(properties);
+ pullHttpChangeIngestor.setUseEtag(false);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+ pullHttpChangeIngestor.run();
+
+ verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(pathConfigBuffer.asReadOnlyBuffer()));
+ }
+
+ @Test
+ public void testNoUpdateWithPath() throws IOException {
+ Properties properties = new Properties();
+ properties.put(PATH_KEY, "/config.yml");
+ pullHttpChangeIngestorInit(properties);
+ pullHttpChangeIngestor.setUseEtag(false);
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
+
+ pullHttpChangeIngestor.run();
+
+ verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
+ }
+
+ @Test
+ public void testUseEtagWithPath() throws IOException {
+ Properties properties = new Properties();
+ properties.put(PATH_KEY, "/config.yml");
+ pullHttpChangeIngestorInit(properties);
+ pullHttpChangeIngestor.setLastEtag("");
+
+ pullHttpChangeIngestor.setUseEtag(true);
+
+ when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
+
+ pullHttpChangeIngestor.run();
+
+ verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(pathConfigBuffer.asReadOnlyBuffer()));
+
+ pullHttpChangeIngestor.run();
+
+ verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.any());
+
+ }
+
+ static class JettyHandler extends AbstractHandler {
+ volatile String configResponse;
+ volatile String pathResponse;
+
+ public JettyHandler(String configResponse, String pathResponse){
+ this.configResponse = configResponse;
+ this.pathResponse = pathResponse;
+ }
+
+
+ @Override
+ public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
+ throws IOException, ServletException {
+
+ baseRequest.setHandled(true);
+
+ if ("GET".equals(request.getMethod())) {
+
+ if (QUOTED_ETAG.equals(baseRequest.getHeader("If-None-Match"))){
+ writeOutput(response, null, 304);
+ } else {
+
+ if ("/config.yml".equals(baseRequest.getPathInfo())) {
+ writeOutput(response, pathResponse, 200);
+ } else {
+ writeOutput(response, configResponse, 200);
+ }
+ }
+
+
+ } else {
+ writeOutput(response, "not a GET request", 404);
+ }
+ }
+
+ private void writeOutput(HttpServletResponse response, String responseBuffer, int responseCode) throws IOException {
+ response.setStatus(responseCode);
+ response.setHeader("ETag", ETAG);
+ if (responseBuffer != null) {
+ response.setContentType("text/plain");
+ response.setContentLength(responseBuffer.length());
+ response.setCharacterEncoding(StandardCharsets.UTF_8.displayName());
+ try (PrintWriter writer = response.getWriter()) {
+ writer.print(responseBuffer);
+ writer.flush();
+ }
+ }
+ }
+
+ }
+}
[3/4] nifi-minifi git commit: MINIFI-279: Makes test classes naming
consistent.
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/RestChangeIngestorCommonTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/RestChangeIngestorCommonTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/RestChangeIngestorCommonTest.java
new file mode 100644
index 0000000..2f4f7a3
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/RestChangeIngestorCommonTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
+
+import okhttp3.Headers;
+import okhttp3.MediaType;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.RequestBody;
+import okhttp3.Response;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
+import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
+import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
+import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
+import org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public abstract class RestChangeIngestorCommonTest {
+
+ private static String testString = "This is a test string.";
+
+ public static OkHttpClient client;
+ public static RestChangeIngestor restChangeIngestor;
+ public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8");
+ public static String url;
+ public static ConfigurationChangeNotifier testNotifier;
+ public static Differentiator<InputStream> mockDifferentiator = Mockito.mock(Differentiator.class);
+
+
+ @Before
+ public void before() {
+ Mockito.reset(testNotifier);
+ ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
+ when(testListener.getDescriptor()).thenReturn("MockChangeListener");
+ Mockito.when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ Request request = new Request.Builder()
+ .url(url)
+ .build();
+
+ Response response = client.newCall(request).execute();
+ if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+ Headers responseHeaders = response.headers();
+ for (int i = 0; i < responseHeaders.size(); i++) {
+ System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+ }
+
+ assertEquals(RestChangeIngestor.GET_TEXT, response.body().string());
+ verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
+ }
+
+ @Test
+ public void testFileUploadNewConfig() throws Exception {
+ when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
+
+ Request request = new Request.Builder()
+ .url(url)
+ .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, testString))
+ .addHeader("charset","UTF-8")
+ .build();
+
+ Response response = client.newCall(request).execute();
+ if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+ Headers responseHeaders = response.headers();
+ for (int i = 0; i < responseHeaders.size(); i++) {
+ System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+ }
+
+ assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string());
+
+ verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(ByteBuffer.wrap(testString.getBytes())));
+ }
+
+ @Test
+ public void testFileUploadSameConfig() throws Exception {
+ when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
+
+ Request request = new Request.Builder()
+ .url(url)
+ .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, testString))
+ .addHeader("charset","UTF-8")
+ .build();
+
+ Response response = client.newCall(request).execute();
+ if (response.isSuccessful()) throw new IOException("Unexpected code " + response);
+
+ Headers responseHeaders = response.headers();
+ for (int i = 0; i < responseHeaders.size(); i++) {
+ System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
+ }
+
+ assertEquals("Request received but instance is already running this config.", response.body().string());
+
+ verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestPullHttpChangeIngestorCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestPullHttpChangeIngestorCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestPullHttpChangeIngestorCommon.java
deleted file mode 100644
index 53e66d7..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestPullHttpChangeIngestorCommon.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
-
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
-import org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor;
-import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.handler.AbstractHandler;
-import org.eclipse.jetty.server.handler.HandlerCollection;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.Properties;
-
-import static org.apache.nifi.minifi.bootstrap.configuration.ingestors.PullHttpChangeIngestor.PATH_KEY;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public abstract class TestPullHttpChangeIngestorCommon {
-
- public static volatile Server jetty;
- public static volatile int port;
- public static volatile PullHttpChangeIngestor pullHttpChangeIngestor;
- public static ConfigurationChangeNotifier testNotifier = Mockito.mock(ConfigurationChangeNotifier.class);
- public static Differentiator<ByteBuffer> mockDifferentiator = Mockito.mock(Differentiator.class);
- public static final String RESPONSE_STRING = "test";
- public static final String PATH_RESPONSE_STRING = "path";
- public static ByteBuffer configBuffer= ByteBuffer.wrap(RESPONSE_STRING.getBytes());
- public static ByteBuffer pathConfigBuffer= ByteBuffer.wrap(PATH_RESPONSE_STRING.getBytes());
- public static final String ETAG = "testEtag";
- public static final String QUOTED_ETAG = "\"testEtag\"";
-
- public static void init() {
- QueuedThreadPool queuedThreadPool = new QueuedThreadPool();
- queuedThreadPool.setDaemon(true);
- jetty = new Server(queuedThreadPool);
-
- HandlerCollection handlerCollection = new HandlerCollection(true);
- handlerCollection.addHandler(new JettyHandler(RESPONSE_STRING, PATH_RESPONSE_STRING));
- jetty.setHandler(handlerCollection);
- }
-
- public abstract void pullHttpChangeIngestorInit(Properties properties);
-
- @Before
- public void before() {
- Mockito.reset(testNotifier);
- ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
- when(testListener.getDescriptor()).thenReturn("MockChangeListener");
- Mockito.when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
- }
-
- @AfterClass
- public static void shutdown() throws Exception {
- jetty.stop();
- }
-
- @Test
- public void testNewUpdate() throws IOException {
- Properties properties = new Properties();
- pullHttpChangeIngestorInit(properties);
- pullHttpChangeIngestor.setUseEtag(false);
- when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
-
- pullHttpChangeIngestor.run();
-
- verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer.asReadOnlyBuffer()));
- }
-
-
- @Test
- public void testNoUpdate() throws IOException {
- Properties properties = new Properties();
- pullHttpChangeIngestorInit(properties);
- pullHttpChangeIngestor.setUseEtag(false);
- when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
-
- pullHttpChangeIngestor.run();
-
- verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
- }
-
- @Test
- public void testUseEtag() throws IOException {
- Properties properties = new Properties();
- pullHttpChangeIngestorInit(properties);
- pullHttpChangeIngestor.setLastEtag("");
-
- pullHttpChangeIngestor.setUseEtag(true);
-
- when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
-
- pullHttpChangeIngestor.run();
-
- verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(configBuffer));
-
- pullHttpChangeIngestor.run();
-
- verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.any());
-
- }
-
- @Test
- public void testNewUpdateWithPath() throws IOException {
- Properties properties = new Properties();
- properties.put(PATH_KEY, "/config.yml");
- pullHttpChangeIngestorInit(properties);
- pullHttpChangeIngestor.setUseEtag(false);
- when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
-
- pullHttpChangeIngestor.run();
-
- verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(pathConfigBuffer.asReadOnlyBuffer()));
- }
-
- @Test
- public void testNoUpdateWithPath() throws IOException {
- Properties properties = new Properties();
- properties.put(PATH_KEY, "/config.yml");
- pullHttpChangeIngestorInit(properties);
- pullHttpChangeIngestor.setUseEtag(false);
- when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(false);
-
- pullHttpChangeIngestor.run();
-
- verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
- }
-
- @Test
- public void testUseEtagWithPath() throws IOException {
- Properties properties = new Properties();
- properties.put(PATH_KEY, "/config.yml");
- pullHttpChangeIngestorInit(properties);
- pullHttpChangeIngestor.setLastEtag("");
-
- pullHttpChangeIngestor.setUseEtag(true);
-
- when(mockDifferentiator.isNew(Mockito.any(ByteBuffer.class))).thenReturn(true);
-
- pullHttpChangeIngestor.run();
-
- verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(pathConfigBuffer.asReadOnlyBuffer()));
-
- pullHttpChangeIngestor.run();
-
- verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.any());
-
- }
-
- static class JettyHandler extends AbstractHandler {
- volatile String configResponse;
- volatile String pathResponse;
-
- public JettyHandler(String configResponse, String pathResponse){
- this.configResponse = configResponse;
- this.pathResponse = pathResponse;
- }
-
-
- @Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
- throws IOException, ServletException {
-
- baseRequest.setHandled(true);
-
- if ("GET".equals(request.getMethod())) {
-
- if (QUOTED_ETAG.equals(baseRequest.getHeader("If-None-Match"))){
- writeOutput(response, null, 304);
- } else {
-
- if ("/config.yml".equals(baseRequest.getPathInfo())) {
- writeOutput(response, pathResponse, 200);
- } else {
- writeOutput(response, configResponse, 200);
- }
- }
-
-
- } else {
- writeOutput(response, "not a GET request", 404);
- }
- }
-
- private void writeOutput(HttpServletResponse response, String responseBuffer, int responseCode) throws IOException {
- response.setStatus(responseCode);
- response.setHeader("ETag", ETAG);
- if (responseBuffer != null) {
- response.setContentType("text/plain");
- response.setContentLength(responseBuffer.length());
- response.setCharacterEncoding(StandardCharsets.UTF_8.displayName());
- try (PrintWriter writer = response.getWriter()) {
- writer.print(responseBuffer);
- writer.flush();
- }
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestRestChangeIngestorCommon.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestRestChangeIngestorCommon.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestRestChangeIngestorCommon.java
deleted file mode 100644
index 72e768a..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/configuration/ingestors/common/TestRestChangeIngestorCommon.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.configuration.ingestors.common;
-
-import okhttp3.Headers;
-import okhttp3.MediaType;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.RequestBody;
-import okhttp3.Response;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeListener;
-import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeNotifier;
-import org.apache.nifi.minifi.bootstrap.configuration.ListenerHandleResult;
-import org.apache.nifi.minifi.bootstrap.configuration.differentiators.interfaces.Differentiator;
-import org.apache.nifi.minifi.bootstrap.configuration.ingestors.RestChangeIngestor;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-public abstract class TestRestChangeIngestorCommon {
-
- private static String testString = "This is a test string.";
-
- public static OkHttpClient client;
- public static RestChangeIngestor restChangeIngestor;
- public static final MediaType MEDIA_TYPE_MARKDOWN = MediaType.parse("text/x-markdown; charset=utf-8");
- public static String url;
- public static ConfigurationChangeNotifier testNotifier;
- public static Differentiator<InputStream> mockDifferentiator = Mockito.mock(Differentiator.class);
-
-
- @Before
- public void before() {
- Mockito.reset(testNotifier);
- ConfigurationChangeListener testListener = Mockito.mock(ConfigurationChangeListener.class);
- when(testListener.getDescriptor()).thenReturn("MockChangeListener");
- Mockito.when(testNotifier.notifyListeners(Mockito.any())).thenReturn(Collections.singleton(new ListenerHandleResult(testListener)));
- }
-
- @Test
- public void testGet() throws Exception {
- Request request = new Request.Builder()
- .url(url)
- .build();
-
- Response response = client.newCall(request).execute();
- if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
- Headers responseHeaders = response.headers();
- for (int i = 0; i < responseHeaders.size(); i++) {
- System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
- }
-
- assertEquals(RestChangeIngestor.GET_TEXT, response.body().string());
- verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any(ByteBuffer.class));
- }
-
- @Test
- public void testFileUploadNewConfig() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(true);
-
- Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, testString))
- .addHeader("charset","UTF-8")
- .build();
-
- Response response = client.newCall(request).execute();
- if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
- Headers responseHeaders = response.headers();
- for (int i = 0; i < responseHeaders.size(); i++) {
- System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
- }
-
- assertEquals("The result of notifying listeners:\nMockChangeListener successfully handled the configuration change\n", response.body().string());
-
- verify(testNotifier, Mockito.times(1)).notifyListeners(Mockito.eq(ByteBuffer.wrap(testString.getBytes())));
- }
-
- @Test
- public void testFileUploadSameConfig() throws Exception {
- when(mockDifferentiator.isNew(Mockito.any(InputStream.class))).thenReturn(false);
-
- Request request = new Request.Builder()
- .url(url)
- .post(RequestBody.create(MEDIA_TYPE_MARKDOWN, testString))
- .addHeader("charset","UTF-8")
- .build();
-
- Response response = client.newCall(request).execute();
- if (response.isSuccessful()) throw new IOException("Unexpected code " + response);
-
- Headers responseHeaders = response.headers();
- for (int i = 0; i < responseHeaders.size(); i++) {
- System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
- }
-
- assertEquals("Request received but instance is already running this config.", response.body().string());
-
- verify(testNotifier, Mockito.never()).notifyListeners(Mockito.any());
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/status/reporters/StatusLoggerTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/status/reporters/StatusLoggerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/status/reporters/StatusLoggerTest.java
new file mode 100644
index 0000000..576dc4e
--- /dev/null
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/status/reporters/StatusLoggerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.bootstrap.status.reporters;
+
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.minifi.bootstrap.QueryableStatusAggregator;
+import org.apache.nifi.minifi.commons.status.FlowStatusReport;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Properties;
+
+import static org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger.ENCOUNTERED_IO_EXCEPTION;
+import static org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger.LOGGING_LEVEL_KEY;
+import static org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger.QUERY_KEY;
+import static org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger.REPORT_PERIOD_KEY;
+import static org.mockito.Mockito.verify;
+
+public class StatusLoggerTest {
+
+ private static final String MOCK_STATUS = "FlowStatusReport{controllerServiceStatusList=null, processorStatusList=[{name='TailFile', processorHealth={runStatus='Running', hasBulletins=false, " +
+ "validationErrorList=[]}, processorStats=null, bulletinList=null}], connectionStatusList=null, remoteProcessingGroupStatusList=null, instanceStatus=null, systemDiagnosticsStatus=null," +
+ " reportingTaskStatusList=null, errorsGeneratingReport=[]}";
+
+ private static final String MOCK_QUERY = "processor:all:health";
+
+ private StatusLogger statusLogger;
+ private Logger logger;
+ private QueryableStatusAggregator queryableStatusAggregator;
+ private FlowStatusReport flowStatusReport;
+
+ @Before
+ public void init() throws IOException, NoSuchFieldException, IllegalAccessException {
+ statusLogger = Mockito.spy(new StatusLogger());
+
+ logger = Mockito.mock(Logger.class);
+ queryableStatusAggregator = Mockito.mock(QueryableStatusAggregator.class);
+ flowStatusReport = Mockito.mock(FlowStatusReport.class);
+
+ Mockito.when(flowStatusReport.toString()).thenReturn(MOCK_STATUS);
+
+ Field field = StatusLogger.class.getDeclaredField("logger");
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ field.set(null, logger);
+
+
+ Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenReturn(flowStatusReport);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testFailedInitDueToFatalLogLevel(){
+ Properties properties = new Properties();
+ properties.setProperty(REPORT_PERIOD_KEY, "100");
+ properties.setProperty(LOGGING_LEVEL_KEY, LogLevel.FATAL.name());
+ properties.setProperty(QUERY_KEY, MOCK_QUERY);
+
+ statusLogger.initialize(properties, queryableStatusAggregator);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testFailedInitDueToNoPeriod(){
+ Properties properties = new Properties();
+ properties.setProperty(LOGGING_LEVEL_KEY, LogLevel.INFO.name());
+ properties.setProperty(QUERY_KEY, MOCK_QUERY);
+
+ statusLogger.initialize(properties, queryableStatusAggregator);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testFailedInitDueToNoQuery(){
+ Properties properties = new Properties();
+ properties.setProperty(REPORT_PERIOD_KEY, "100");
+ properties.setProperty(LOGGING_LEVEL_KEY, LogLevel.INFO.name());
+
+ statusLogger.initialize(properties, queryableStatusAggregator);
+ }
+
+ @Test
+ public void TestTrace() {
+ statusLogger.initialize(getProperties(LogLevel.TRACE), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).info(MOCK_STATUS, (Throwable) null);
+ }
+
+ @Test
+ public void TestDebug() {
+ statusLogger.initialize(getProperties(LogLevel.DEBUG), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).debug(MOCK_STATUS, (Throwable) null);
+ }
+
+ @Test
+ public void TestInfo() {
+ statusLogger.initialize(getProperties(LogLevel.INFO), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).info(MOCK_STATUS, (Throwable) null);
+ }
+
+ @Test
+ public void TestWarn() {
+ statusLogger.initialize(getProperties(LogLevel.WARN), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).warn(MOCK_STATUS, (Throwable) null);
+ }
+
+ @Test
+ public void TestError() {
+ statusLogger.initialize(getProperties(LogLevel.ERROR), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).error(MOCK_STATUS, (Throwable) null);
+ }
+
+ // Exception testing
+ @Test
+ public void TestTraceException() throws IOException {
+ Properties properties = new Properties();
+ properties.setProperty(REPORT_PERIOD_KEY, "100");
+ properties.setProperty(LOGGING_LEVEL_KEY, LogLevel.TRACE.name());
+ properties.setProperty(QUERY_KEY, MOCK_QUERY);
+
+ IOException ioException = new IOException("This is an expected test exception");
+ Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
+
+ statusLogger.initialize(properties, queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).trace(ENCOUNTERED_IO_EXCEPTION, ioException);
+ }
+
+ @Test
+ public void TestDebugException() throws IOException {
+ IOException ioException = new IOException("This is an expected test exception");
+ Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
+
+ statusLogger.initialize(getProperties(LogLevel.DEBUG), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).debug(ENCOUNTERED_IO_EXCEPTION, ioException);
+ }
+
+ @Test
+ public void TestInfoException() throws IOException {
+ IOException ioException = new IOException("This is an expected test exception");
+ Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
+
+ statusLogger.initialize(getProperties(LogLevel.INFO), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).info(ENCOUNTERED_IO_EXCEPTION, ioException);
+ }
+
+ @Test
+ public void TestWarnException() throws IOException {
+ IOException ioException = new IOException("This is an expected test exception");
+ Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
+
+ statusLogger.initialize(getProperties(LogLevel.WARN), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).warn(ENCOUNTERED_IO_EXCEPTION, ioException);
+ }
+
+ @Test
+ public void TestErrorException() throws IOException {
+ IOException ioException = new IOException("This is an expected test exception");
+ Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
+
+ statusLogger.initialize(getProperties(LogLevel.ERROR), queryableStatusAggregator);
+ statusLogger.start();
+
+ verify(logger, Mockito.timeout(300).atLeastOnce()).error(ENCOUNTERED_IO_EXCEPTION, ioException);
+ }
+
+ private static Properties getProperties(LogLevel logLevel){
+ Properties properties = new Properties();
+ properties.setProperty(REPORT_PERIOD_KEY, "100");
+ properties.setProperty(LOGGING_LEVEL_KEY, logLevel.name());
+ properties.setProperty(QUERY_KEY, MOCK_QUERY);
+ return properties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/status/reporters/TestStatusLogger.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/status/reporters/TestStatusLogger.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/status/reporters/TestStatusLogger.java
deleted file mode 100644
index c7fa78f..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/status/reporters/TestStatusLogger.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.status.reporters;
-
-import org.apache.nifi.logging.LogLevel;
-import org.apache.nifi.minifi.bootstrap.QueryableStatusAggregator;
-import org.apache.nifi.minifi.commons.status.FlowStatusReport;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.Properties;
-
-import static org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger.ENCOUNTERED_IO_EXCEPTION;
-import static org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger.LOGGING_LEVEL_KEY;
-import static org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger.QUERY_KEY;
-import static org.apache.nifi.minifi.bootstrap.status.reporters.StatusLogger.REPORT_PERIOD_KEY;
-import static org.mockito.Mockito.verify;
-
-public class TestStatusLogger {
-
- private static final String MOCK_STATUS = "FlowStatusReport{controllerServiceStatusList=null, processorStatusList=[{name='TailFile', processorHealth={runStatus='Running', hasBulletins=false, " +
- "validationErrorList=[]}, processorStats=null, bulletinList=null}], connectionStatusList=null, remoteProcessingGroupStatusList=null, instanceStatus=null, systemDiagnosticsStatus=null," +
- " reportingTaskStatusList=null, errorsGeneratingReport=[]}";
-
- private static final String MOCK_QUERY = "processor:all:health";
-
- private StatusLogger statusLogger;
- private Logger logger;
- private QueryableStatusAggregator queryableStatusAggregator;
- private FlowStatusReport flowStatusReport;
-
- @Before
- public void init() throws IOException, NoSuchFieldException, IllegalAccessException {
- statusLogger = Mockito.spy(new StatusLogger());
-
- logger = Mockito.mock(Logger.class);
- queryableStatusAggregator = Mockito.mock(QueryableStatusAggregator.class);
- flowStatusReport = Mockito.mock(FlowStatusReport.class);
-
- Mockito.when(flowStatusReport.toString()).thenReturn(MOCK_STATUS);
-
- Field field = StatusLogger.class.getDeclaredField("logger");
- field.setAccessible(true);
- Field modifiersField = Field.class.getDeclaredField("modifiers");
- modifiersField.setAccessible(true);
- modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
- field.set(null, logger);
-
-
- Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenReturn(flowStatusReport);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testFailedInitDueToFatalLogLevel(){
- Properties properties = new Properties();
- properties.setProperty(REPORT_PERIOD_KEY, "100");
- properties.setProperty(LOGGING_LEVEL_KEY, LogLevel.FATAL.name());
- properties.setProperty(QUERY_KEY, MOCK_QUERY);
-
- statusLogger.initialize(properties, queryableStatusAggregator);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testFailedInitDueToNoPeriod(){
- Properties properties = new Properties();
- properties.setProperty(LOGGING_LEVEL_KEY, LogLevel.INFO.name());
- properties.setProperty(QUERY_KEY, MOCK_QUERY);
-
- statusLogger.initialize(properties, queryableStatusAggregator);
- }
-
- @Test(expected = IllegalStateException.class)
- public void testFailedInitDueToNoQuery(){
- Properties properties = new Properties();
- properties.setProperty(REPORT_PERIOD_KEY, "100");
- properties.setProperty(LOGGING_LEVEL_KEY, LogLevel.INFO.name());
-
- statusLogger.initialize(properties, queryableStatusAggregator);
- }
-
- @Test
- public void TestTrace() {
- statusLogger.initialize(getProperties(LogLevel.TRACE), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).info(MOCK_STATUS, (Throwable) null);
- }
-
- @Test
- public void TestDebug() {
- statusLogger.initialize(getProperties(LogLevel.DEBUG), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).debug(MOCK_STATUS, (Throwable) null);
- }
-
- @Test
- public void TestInfo() {
- statusLogger.initialize(getProperties(LogLevel.INFO), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).info(MOCK_STATUS, (Throwable) null);
- }
-
- @Test
- public void TestWarn() {
- statusLogger.initialize(getProperties(LogLevel.WARN), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).warn(MOCK_STATUS, (Throwable) null);
- }
-
- @Test
- public void TestError() {
- statusLogger.initialize(getProperties(LogLevel.ERROR), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).error(MOCK_STATUS, (Throwable) null);
- }
-
- // Exception testing
- @Test
- public void TestTraceException() throws IOException {
- Properties properties = new Properties();
- properties.setProperty(REPORT_PERIOD_KEY, "100");
- properties.setProperty(LOGGING_LEVEL_KEY, LogLevel.TRACE.name());
- properties.setProperty(QUERY_KEY, MOCK_QUERY);
-
- IOException ioException = new IOException("This is an expected test exception");
- Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
-
- statusLogger.initialize(properties, queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).trace(ENCOUNTERED_IO_EXCEPTION, ioException);
- }
-
- @Test
- public void TestDebugException() throws IOException {
- IOException ioException = new IOException("This is an expected test exception");
- Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
-
- statusLogger.initialize(getProperties(LogLevel.DEBUG), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).debug(ENCOUNTERED_IO_EXCEPTION, ioException);
- }
-
- @Test
- public void TestInfoException() throws IOException {
- IOException ioException = new IOException("This is an expected test exception");
- Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
-
- statusLogger.initialize(getProperties(LogLevel.INFO), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).info(ENCOUNTERED_IO_EXCEPTION, ioException);
- }
-
- @Test
- public void TestWarnException() throws IOException {
- IOException ioException = new IOException("This is an expected test exception");
- Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
-
- statusLogger.initialize(getProperties(LogLevel.WARN), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).warn(ENCOUNTERED_IO_EXCEPTION, ioException);
- }
-
- @Test
- public void TestErrorException() throws IOException {
- IOException ioException = new IOException("This is an expected test exception");
- Mockito.when(queryableStatusAggregator.statusReport(MOCK_QUERY)).thenThrow(ioException);
-
- statusLogger.initialize(getProperties(LogLevel.ERROR), queryableStatusAggregator);
- statusLogger.start();
-
- verify(logger, Mockito.timeout(300).atLeastOnce()).error(ENCOUNTERED_IO_EXCEPTION, ioException);
- }
-
- private static Properties getProperties(LogLevel logLevel){
- Properties properties = new Properties();
- properties.setProperty(REPORT_PERIOD_KEY, "100");
- properties.setProperty(LOGGING_LEVEL_KEY, logLevel.name());
- properties.setProperty(QUERY_KEY, MOCK_QUERY);
- return properties;
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
index ecd9001..67c5916 100644
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
+++ b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/ConfigTransformerTest.java
@@ -17,7 +17,38 @@
package org.apache.nifi.minifi.bootstrap.util;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
import org.apache.nifi.minifi.bootstrap.configuration.ConfigurationChangeException;
+import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException;
import org.apache.nifi.minifi.commons.schema.ConfigSchema;
import org.apache.nifi.minifi.commons.schema.ConnectionSchema;
import org.apache.nifi.minifi.commons.schema.ControllerServiceSchema;
@@ -36,33 +67,6 @@ import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-import javax.xml.xpath.XPathFactory;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class ConfigTransformerTest {
public static final Map<String, Integer> PG_ELEMENT_ORDER_MAP = generateOrderMap(
Arrays.asList("processor", "inputPort", "outputPort", "funnel", "processGroup", "remoteProcessGroup", "connection"));
@@ -189,6 +193,195 @@ public class ConfigTransformerTest {
}
}
+ @Test
+ public void doesTransformFile() throws Exception {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
+ @Test
+ public void doesTransformV1File() throws Exception {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-v1.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
+ @Test
+ public void doesTransformInputStream() throws Exception {
+ File inputFile = new File("./src/test/resources/config.yml");
+ ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/");
+
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
+ @Test
+ public void doesTransformOnDefaultFile() throws Exception {
+ ConfigTransformer.transformConfigFile("./src/test/resources/default.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
+ @Test
+ public void doesTransformOnMultipleProcessors() throws Exception {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-processors.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
+ @Test
+ public void doesTransformOnMultipleRemoteProcessGroups() throws Exception {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-RPGs.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
+ @Test
+ public void doesTransformOnMultipleInputPorts() throws Exception {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-input-ports.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
+ @Test
+ public void doesTransformOnMinimal() throws Exception {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-minimal.yml", "./target/");
+ File nifiPropertiesFile = new File("./target/nifi.properties");
+
+ assertTrue(nifiPropertiesFile.exists());
+ assertTrue(nifiPropertiesFile.canRead());
+
+ nifiPropertiesFile.deleteOnExit();
+
+ File flowXml = new File("./target/flow.xml.gz");
+ assertTrue(flowXml.exists());
+ assertTrue(flowXml.canRead());
+
+ flowXml.deleteOnExit();
+ }
+
+ @Test
+ public void handleTransformInvalidFile() throws Exception {
+ try {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-invalid.yml", "./target/");
+ fail("Invalid configuration file was not detected.");
+ } catch (SchemaLoaderException e){
+ assertEquals("Provided YAML configuration is not a Map", e.getMessage());
+ }
+ }
+
+ @Test
+ public void handleTransformMalformedField() throws Exception {
+ try {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-malformed-field.yml", "./target/");
+ fail("Invalid configuration file was not detected.");
+ } catch (InvalidConfigurationException e){
+ assertEquals("Failed to transform config file due to:['threshold' in section 'Swap' because it is found but could not be parsed as a Number]", e.getMessage());
+ }
+ }
+
+ @Test
+ public void handleTransformEmptyFile() throws Exception {
+ try {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-empty.yml", "./target/");
+ fail("Invalid configuration file was not detected.");
+ } catch (SchemaLoaderException e){
+ assertEquals("Provided YAML configuration is not a Map", e.getMessage());
+ }
+ }
+
+ @Test
+ public void handleTransformFileMissingRequiredField() throws Exception {
+ try {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-missing-required-field.yml", "./target/");
+ fail("Invalid configuration file was not detected.");
+ } catch (InvalidConfigurationException e){
+ assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required]", e.getMessage());
+ }
+ }
+
+ @Test
+ public void handleTransformFileMultipleProblems() throws Exception {
+ try {
+ ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-problems.yml", "./target/");
+ fail("Invalid configuration file was not detected.");
+ } catch (InvalidConfigurationException e){
+ assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required], " +
+ "['scheduling strategy' in section 'Provenance Reporting' because it is not a valid scheduling strategy], " +
+ "['source name' in section 'Connections' because it was not found and it is required]", e.getMessage());
+ }
+ }
+
public void testConfigFileTransform(String configFile) throws Exception {
ConfigSchema configSchema = SchemaLoader.loadConfigSchemaFromYaml(ConfigTransformerTest.class.getClassLoader().getResourceAsStream(configFile));
@@ -354,7 +547,7 @@ public class ConfigTransformerTest {
Element item = (Element) propertyElements.item(i);
properties.put(getText(item, "name"), getText(item, "value"));
}
- assertEquals(expected.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> nullToEmpty(e.getValue()))), properties);
+ assertEquals(expected.entrySet().stream().collect(Collectors.toMap(Map.Entry<String, Object>::getKey, e -> nullToEmpty(e.getValue()))), properties);
}
private String getText(Element element, String path) throws XPathExpressionException {
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
----------------------------------------------------------------------
diff --git a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java b/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
deleted file mode 100644
index d644ed3..0000000
--- a/minifi-bootstrap/src/test/java/org/apache/nifi/minifi/bootstrap/util/TestConfigTransformer.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.bootstrap.util;
-
-import org.apache.nifi.minifi.bootstrap.exception.InvalidConfigurationException;
-import org.apache.nifi.minifi.commons.schema.exception.SchemaLoaderException;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.FileInputStream;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class TestConfigTransformer {
-
- @Test
- public void doesTransformFile() throws Exception {
- ConfigTransformer.transformConfigFile("./src/test/resources/config.yml", "./target/");
- File nifiPropertiesFile = new File("./target/nifi.properties");
-
- assertTrue(nifiPropertiesFile.exists());
- assertTrue(nifiPropertiesFile.canRead());
-
- nifiPropertiesFile.deleteOnExit();
-
- File flowXml = new File("./target/flow.xml.gz");
- assertTrue(flowXml.exists());
- assertTrue(flowXml.canRead());
-
- flowXml.deleteOnExit();
- }
-
- @Test
- public void doesTransformV1File() throws Exception {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-v1.yml", "./target/");
- File nifiPropertiesFile = new File("./target/nifi.properties");
-
- assertTrue(nifiPropertiesFile.exists());
- assertTrue(nifiPropertiesFile.canRead());
-
- nifiPropertiesFile.deleteOnExit();
-
- File flowXml = new File("./target/flow.xml.gz");
- assertTrue(flowXml.exists());
- assertTrue(flowXml.canRead());
-
- flowXml.deleteOnExit();
- }
-
- @Test
- public void doesTransformInputStream() throws Exception {
- File inputFile = new File("./src/test/resources/config.yml");
- ConfigTransformer.transformConfigFile(new FileInputStream(inputFile), "./target/");
-
- File nifiPropertiesFile = new File("./target/nifi.properties");
- assertTrue(nifiPropertiesFile.exists());
- assertTrue(nifiPropertiesFile.canRead());
-
- nifiPropertiesFile.deleteOnExit();
-
- File flowXml = new File("./target/flow.xml.gz");
- assertTrue(flowXml.exists());
- assertTrue(flowXml.canRead());
-
- flowXml.deleteOnExit();
- }
-
- @Test
- public void doesTransformOnDefaultFile() throws Exception {
- ConfigTransformer.transformConfigFile("./src/test/resources/default.yml", "./target/");
- File nifiPropertiesFile = new File("./target/nifi.properties");
-
- assertTrue(nifiPropertiesFile.exists());
- assertTrue(nifiPropertiesFile.canRead());
-
- nifiPropertiesFile.deleteOnExit();
-
- File flowXml = new File("./target/flow.xml.gz");
- assertTrue(flowXml.exists());
- assertTrue(flowXml.canRead());
-
- flowXml.deleteOnExit();
- }
-
- @Test
- public void doesTransformOnMultipleProcessors() throws Exception {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-processors.yml", "./target/");
- File nifiPropertiesFile = new File("./target/nifi.properties");
-
- assertTrue(nifiPropertiesFile.exists());
- assertTrue(nifiPropertiesFile.canRead());
-
- nifiPropertiesFile.deleteOnExit();
-
- File flowXml = new File("./target/flow.xml.gz");
- assertTrue(flowXml.exists());
- assertTrue(flowXml.canRead());
-
- flowXml.deleteOnExit();
- }
-
- @Test
- public void doesTransformOnMultipleRemoteProcessGroups() throws Exception {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-RPGs.yml", "./target/");
- File nifiPropertiesFile = new File("./target/nifi.properties");
-
- assertTrue(nifiPropertiesFile.exists());
- assertTrue(nifiPropertiesFile.canRead());
-
- nifiPropertiesFile.deleteOnExit();
-
- File flowXml = new File("./target/flow.xml.gz");
- assertTrue(flowXml.exists());
- assertTrue(flowXml.canRead());
-
- flowXml.deleteOnExit();
- }
-
- @Test
- public void doesTransformOnMultipleInputPorts() throws Exception {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-input-ports.yml", "./target/");
- File nifiPropertiesFile = new File("./target/nifi.properties");
-
- assertTrue(nifiPropertiesFile.exists());
- assertTrue(nifiPropertiesFile.canRead());
-
- nifiPropertiesFile.deleteOnExit();
-
- File flowXml = new File("./target/flow.xml.gz");
- assertTrue(flowXml.exists());
- assertTrue(flowXml.canRead());
-
- flowXml.deleteOnExit();
- }
-
- @Test
- public void doesTransformOnMinimal() throws Exception {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-minimal.yml", "./target/");
- File nifiPropertiesFile = new File("./target/nifi.properties");
-
- assertTrue(nifiPropertiesFile.exists());
- assertTrue(nifiPropertiesFile.canRead());
-
- nifiPropertiesFile.deleteOnExit();
-
- File flowXml = new File("./target/flow.xml.gz");
- assertTrue(flowXml.exists());
- assertTrue(flowXml.canRead());
-
- flowXml.deleteOnExit();
- }
-
- @Test
- public void handleTransformInvalidFile() throws Exception {
- try {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-invalid.yml", "./target/");
- fail("Invalid configuration file was not detected.");
- } catch (SchemaLoaderException e){
- assertEquals("Provided YAML configuration is not a Map", e.getMessage());
- }
- }
-
- @Test
- public void handleTransformMalformedField() throws Exception {
- try {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-malformed-field.yml", "./target/");
- fail("Invalid configuration file was not detected.");
- } catch (InvalidConfigurationException e){
- assertEquals("Failed to transform config file due to:['threshold' in section 'Swap' because it is found but could not be parsed as a Number]", e.getMessage());
- }
- }
-
- @Test
- public void handleTransformEmptyFile() throws Exception {
- try {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-empty.yml", "./target/");
- fail("Invalid configuration file was not detected.");
- } catch (SchemaLoaderException e){
- assertEquals("Provided YAML configuration is not a Map", e.getMessage());
- }
- }
-
- @Test
- public void handleTransformFileMissingRequiredField() throws Exception {
- try {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-missing-required-field.yml", "./target/");
- fail("Invalid configuration file was not detected.");
- } catch (InvalidConfigurationException e){
- assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required]", e.getMessage());
- }
- }
-
- @Test
- public void handleTransformFileMultipleProblems() throws Exception {
- try {
- ConfigTransformer.transformConfigFile("./src/test/resources/config-multiple-problems.yml", "./target/");
- fail("Invalid configuration file was not detected.");
- } catch (InvalidConfigurationException e){
- assertEquals("Failed to transform config file due to:['class' in section 'Processors' because it was not found and it is required], " +
- "['scheduling strategy' in section 'Provenance Reporting' because it is not a valid scheduling strategy], " +
- "['source name' in section 'Connections' because it was not found and it is required]", e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-commons/minifi-utils/src/test/java/org/apache/nifi/minifi/commons/status/StatusReportTest.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-utils/src/test/java/org/apache/nifi/minifi/commons/status/StatusReportTest.java b/minifi-commons/minifi-utils/src/test/java/org/apache/nifi/minifi/commons/status/StatusReportTest.java
new file mode 100644
index 0000000..8fc442f
--- /dev/null
+++ b/minifi-commons/minifi-utils/src/test/java/org/apache/nifi/minifi/commons/status/StatusReportTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.commons.status;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addConnectionStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addControllerServiceStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addExpectedRemoteProcessGroupStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addInstanceStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addProcessorStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addReportingTaskStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addSystemDiagnosticStatus;
+import static org.junit.Assert.assertEquals;
+
+public class StatusReportTest {
+
+ @Test
+ public void verifySerializableFullyPopulated() throws IOException, ClassNotFoundException {
+ FlowStatusReport original = new FlowStatusReport();
+
+ addControllerServiceStatus(original, true, true, true, true);
+ addInstanceStatus(original, true, true, true, true);
+ addSystemDiagnosticStatus(original, true, true, true, true, true);
+ addReportingTaskStatus(original, true, true, true, true);
+ addConnectionStatus(original, true, true);
+ addProcessorStatus(original, true, true, true, true, true);
+ addExpectedRemoteProcessGroupStatus(original, true, true, true, true, true, true);
+
+ byte[] byteArrayCopy = serialize(original);
+ FlowStatusReport copy = unSerialize(byteArrayCopy, FlowStatusReport.class);
+
+ assertEquals(original, copy);
+ }
+
+ @Test
+ public void verifySerializableSomeNull() throws IOException, ClassNotFoundException {
+ FlowStatusReport original = new FlowStatusReport();
+
+ addControllerServiceStatus(original, true, true, true, true);
+ addInstanceStatus(original, true, true, true, true);
+ addSystemDiagnosticStatus(original, true, true, true, true, true);
+ addProcessorStatus(original, true, true, true, true, true);
+ addExpectedRemoteProcessGroupStatus(original, true, true, true, true, true, true);
+
+ byte[] byteArrayCopy = serialize(original);
+ FlowStatusReport copy = unSerialize(byteArrayCopy, FlowStatusReport.class);
+
+ assertEquals(original, copy);
+ }
+
+ private static <T extends Serializable> byte[] serialize(T obj) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(obj);
+ oos.close();
+ return baos.toByteArray();
+ }
+
+ private static <T extends Serializable> T unSerialize(byte[] b, Class<T> cl) throws IOException, ClassNotFoundException {
+ ByteArrayInputStream bais = new ByteArrayInputStream(b);
+ ObjectInputStream ois = new ObjectInputStream(bais);
+ Object o = ois.readObject();
+ return cl.cast(o);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-commons/minifi-utils/src/test/java/org/apache/nifi/minifi/commons/status/TestStatusReport.java
----------------------------------------------------------------------
diff --git a/minifi-commons/minifi-utils/src/test/java/org/apache/nifi/minifi/commons/status/TestStatusReport.java b/minifi-commons/minifi-utils/src/test/java/org/apache/nifi/minifi/commons/status/TestStatusReport.java
deleted file mode 100644
index fc697e3..0000000
--- a/minifi-commons/minifi-utils/src/test/java/org/apache/nifi/minifi/commons/status/TestStatusReport.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.commons.status;
-
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addConnectionStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addControllerServiceStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addExpectedRemoteProcessGroupStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addInstanceStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addProcessorStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addReportingTaskStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addSystemDiagnosticStatus;
-import static org.junit.Assert.assertEquals;
-
-public class TestStatusReport {
-
- @Test
- public void verifySerializableFullyPopulated() throws IOException, ClassNotFoundException {
- FlowStatusReport original = new FlowStatusReport();
-
- addControllerServiceStatus(original, true, true, true, true);
- addInstanceStatus(original, true, true, true, true);
- addSystemDiagnosticStatus(original, true, true, true, true, true);
- addReportingTaskStatus(original, true, true, true, true);
- addConnectionStatus(original, true, true);
- addProcessorStatus(original, true, true, true, true, true);
- addExpectedRemoteProcessGroupStatus(original, true, true, true, true, true, true);
-
- byte[] byteArrayCopy = serialize(original);
- FlowStatusReport copy = unSerialize(byteArrayCopy, FlowStatusReport.class);
-
- assertEquals(original, copy);
- }
-
- @Test
- public void verifySerializableSomeNull() throws IOException, ClassNotFoundException {
- FlowStatusReport original = new FlowStatusReport();
-
- addControllerServiceStatus(original, true, true, true, true);
- addInstanceStatus(original, true, true, true, true);
- addSystemDiagnosticStatus(original, true, true, true, true, true);
- addProcessorStatus(original, true, true, true, true, true);
- addExpectedRemoteProcessGroupStatus(original, true, true, true, true, true, true);
-
- byte[] byteArrayCopy = serialize(original);
- FlowStatusReport copy = unSerialize(byteArrayCopy, FlowStatusReport.class);
-
- assertEquals(original, copy);
- }
-
- private static <T extends Serializable> byte[] serialize(T obj) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream(baos);
- oos.writeObject(obj);
- oos.close();
- return baos.toByteArray();
- }
-
- private static <T extends Serializable> T unSerialize(byte[] b, Class<T> cl) throws IOException, ClassNotFoundException {
- ByteArrayInputStream bais = new ByteArrayInputStream(b);
- ObjectInputStream ois = new ObjectInputStream(bais);
- Object o = ois.readObject();
- return cl.cast(o);
- }
-}
[2/4] nifi-minifi git commit: MINIFI-279: Makes test classes naming
consistent.
Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/StatusConfigReporterTest.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/StatusConfigReporterTest.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/StatusConfigReporterTest.java
new file mode 100644
index 0000000..171400a
--- /dev/null
+++ b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/StatusConfigReporterTest.java
@@ -0,0 +1,875 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.minifi.status;
+
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.ConfiguredComponent;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.RunStatus;
+import org.apache.nifi.controller.status.TransmissionStatus;
+import org.apache.nifi.diagnostics.GarbageCollection;
+import org.apache.nifi.diagnostics.StorageUsage;
+import org.apache.nifi.diagnostics.SystemDiagnostics;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
+import org.apache.nifi.minifi.commons.status.FlowStatusReport;
+import org.apache.nifi.remote.RemoteGroupPort;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addConnectionStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addControllerServiceStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addExpectedRemoteProcessGroupStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addInstanceStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addProcessorStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addReportingTaskStatus;
+import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addSystemDiagnosticStatus;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class StatusConfigReporterTest {
+ private FlowController mockFlowController;
+ private ProcessGroupStatus rootGroupStatus;
+ private BulletinRepository bulletinRepo;
+ private ProcessGroup processGroup;
+
+ @Before
+ public void setup() {
+ mockFlowController = mock(FlowController.class);
+ rootGroupStatus = mock(ProcessGroupStatus.class);
+ bulletinRepo = mock(BulletinRepository.class);
+ processGroup = mock(ProcessGroup.class);
+
+ when(mockFlowController.getRootGroupId()).thenReturn("root");
+ when(mockFlowController.getGroupStatus("root")).thenReturn(rootGroupStatus);
+ when(mockFlowController.getControllerStatus()).thenReturn(rootGroupStatus);
+ when(mockFlowController.getBulletinRepository()).thenReturn(bulletinRepo);
+ when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup);
+ }
+
+ @Test
+ public void processorStatusHealth() throws Exception {
+ populateProcessor(false, false);
+
+ String statusRequest = "processor:all:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addProcessorStatus(expected, true, false, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void processorStatusWithValidationErrors() throws Exception {
+ populateProcessor(true, false);
+
+ String statusRequest = "processor:all:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addProcessorStatus(expected, true, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void processorStatusAll() throws Exception {
+ populateProcessor(true, true);
+
+ String statusRequest = "processor:all:health, stats, bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addProcessorStatus(expected, true, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void connectionStatusHealth() throws Exception {
+ populateConnection();
+
+ String statusRequest = "connection:all:health";
+ FlowStatusReport status = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addConnectionStatus(expected, true, false);
+
+ assertEquals(expected, status);
+ }
+
+
+ @Test
+ public void connectionStatusAll() throws Exception {
+ populateConnection();
+
+ String statusRequest = "connection:all:health, stats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+ addConnectionStatus(expected, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void connectionAndProcessorStatusHealth() throws Exception {
+
+ populateConnection();
+
+ populateProcessor(false, false);
+
+ String statusRequest = "connection:connectionId:health; processor:UpdateAttributeProcessorId:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ addConnectionStatus(expected, true, false);
+
+ addProcessorStatus(expected, true, false, false, false, false);
+
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void provenanceReportingTaskStatusHealth() throws Exception {
+ populateReportingTask(false, false);
+
+ String statusRequest = "provenanceReporting:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+ addReportingTaskStatus(expected, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+
+ @Test
+ public void provenanceReportingTaskStatusBulletins() throws Exception {
+ populateReportingTask(true, false);
+
+ String statusRequest = "provenanceReporting:bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addReportingTaskStatus(expected, false, false, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void provenanceReportingTaskStatusAll() throws Exception {
+ populateReportingTask(true, true);
+
+ String statusRequest = "provenanceReporting:health,bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addReportingTaskStatus(expected, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticHeap() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:heap";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, true, false, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticProcessorStats() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:processorStats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, false, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticFlowFileRepo() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:flowfilerepositoryusage";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, false, false, true, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticContentRepo() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:contentrepositoryusage";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, false, false, false, true, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void systemDiagnosticGarbageCollection() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:garbagecollection";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, false, false, false, false, true);
+
+ assertEquals(expected, actual);
+ }
+
+
+ @Test
+ public void systemDiagnosticAll() throws Exception {
+ populateSystemDiagnostics();
+
+ String statusRequest = "systemDiagnostics:garbagecollection, heap, processorstats, contentrepositoryusage, flowfilerepositoryusage";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addSystemDiagnosticStatus(expected, true, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void instanceStatusHealth() throws Exception {
+ populateInstance(false);
+
+ String statusRequest = "instance:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+ addInstanceStatus(expected, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void instanceStatusBulletins() throws Exception {
+ populateInstance(true);
+
+ String statusRequest = "instance:bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addInstanceStatus(expected, false, false, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void instanceStatusStats() throws Exception {
+ populateInstance(false);
+
+ String statusRequest = "instance:stats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addInstanceStatus(expected, false, true, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void instanceStatusAll() throws Exception {
+ populateInstance(true);
+
+ String statusRequest = "instance:stats, bulletins, health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addInstanceStatus(expected, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void controllerServiceStatusHealth() throws Exception {
+ populateControllerService(false, false);
+
+ String statusRequest = "controllerServices:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addControllerServiceStatus(expected, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void controllerServiceStatusBulletins() throws Exception {
+ populateControllerService(false, true);
+
+ String statusRequest = "controllerServices:bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addControllerServiceStatus(expected, false, false, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void controllerServiceStatusAll() throws Exception {
+ populateControllerService(true, true);
+
+ String statusRequest = "controllerServices:bulletins, health";
+
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addControllerServiceStatus(expected, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusHealth() throws Exception {
+ populateRemoteProcessGroup(false, false);
+
+ String statusRequest = "remoteProcessGroup:all:health";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, true, false, false, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusBulletins() throws Exception {
+ populateRemoteProcessGroup(true, false);
+
+ String statusRequest = "remoteProcessGroup:all:bulletins";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, false, false, false, false, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusInputPorts() throws Exception {
+ populateRemoteProcessGroup(false, false);
+
+ String statusRequest = "remoteProcessGroup:all:inputPorts";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, false, true, false, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusOutputPorts() throws Exception {
+ populateRemoteProcessGroup(false, false);
+
+ String statusRequest = "remoteProcessGroup:all:outputPorts";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, false, false, true, false, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void remoteProcessGroupStatusStats() throws Exception {
+ populateRemoteProcessGroup(false, false);
+
+ String statusRequest = "remoteProcessGroup:all:stats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, false, false, false, true, false, false);
+
+ assertEquals(expected, actual);
+ }
+
+
+ @Test
+ public void remoteProcessGroupStatusAll() throws Exception {
+ populateRemoteProcessGroup(true, true);
+
+ String statusRequest = "remoteProcessGroup:all:health, bulletins, inputPorts, outputPorts, stats";
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, true);
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void statusEverything() throws Exception {
+ when(bulletinRepo.findBulletins(anyObject())).thenReturn(Collections.emptyList());
+
+ populateControllerService(true, false);
+ populateInstance(true);
+ populateSystemDiagnostics();
+ populateReportingTask(false, true);
+ populateConnection();
+ populateProcessor(true, false);
+ populateRemoteProcessGroup(false, true);
+
+ String statusRequest = "controllerServices:bulletins,health; processor:all:health,stats,bulletins; instance:bulletins,health,stats ; systemDiagnostics:garbagecollection, heap, " +
+ "processorstats, contentrepositoryusage, flowfilerepositoryusage; connection:all:health,stats; provenanceReporting:health,bulletins; remoteProcessGroup:all:health, " +
+ "bulletins, inputPorts, outputPorts, stats";
+
+ FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(StatusConfigReporterTest.class));
+
+ FlowStatusReport expected = new FlowStatusReport();
+ expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
+
+ addControllerServiceStatus(expected, true, true, true, false);
+ addInstanceStatus(expected, true, true, true, true);
+ addSystemDiagnosticStatus(expected, true, true, true, true, true);
+ addReportingTaskStatus(expected, true, true, true, false);
+ addConnectionStatus(expected, true, true);
+ addProcessorStatus(expected, true, true, true, true, false);
+ addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, false);
+
+ assertEquals(expected, actual);
+ }
+
+
+ /***************************
+ * Populator methods
+ *************************/
+
+ private void addBulletinsToInstance() {
+ Bulletin bulletin = mock(Bulletin.class);
+ when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L));
+ when(bulletin.getMessage()).thenReturn("Bulletin message");
+
+ List<Bulletin> bulletinList = new ArrayList<>();
+ bulletinList.add(bulletin);
+
+ when(bulletinRepo.findBulletinsForController()).thenReturn(bulletinList);
+ }
+
+ private void populateSystemDiagnostics() {
+ SystemDiagnostics systemDiagnostics = new SystemDiagnostics();
+ addGarbageCollectionToSystemDiagnostics(systemDiagnostics);
+ addHeapSystemDiagnostics(systemDiagnostics);
+ addContentRepoToSystemDiagnostics(systemDiagnostics);
+ addFlowFileRepoToSystemDiagnostics(systemDiagnostics);
+ addProcessorInfoToSystemDiagnostics(systemDiagnostics);
+ when(mockFlowController.getSystemDiagnostics()).thenReturn(systemDiagnostics);
+ }
+
+ private void populateControllerService(boolean validationErrors, boolean addBulletins) {
+ ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class);
+ addControllerServiceHealth(controllerServiceNode);
+ if (validationErrors) {
+ addValidationErrors(controllerServiceNode);
+ }
+
+ if (addBulletins) {
+ addBulletins("Bulletin message", controllerServiceNode.getIdentifier());
+ }
+ HashSet<ControllerServiceNode> controllerServiceNodes = new HashSet<>();
+ controllerServiceNodes.add(controllerServiceNode);
+ when(mockFlowController.getAllControllerServices()).thenReturn(controllerServiceNodes);
+ }
+
+ private void populateInstance(boolean addBulletins) {
+ setRootGroupStatusVariables();
+ if (addBulletins) {
+ addBulletinsToInstance();
+ }
+ }
+
+ private void populateReportingTask(boolean addBulletins, boolean validationErrors) {
+ if (addBulletins) {
+ addBulletins("Bulletin message", "ReportProvenance");
+ }
+
+ ReportingTaskNode reportingTaskNode = mock(ReportingTaskNode.class);
+ addReportingTaskNodeVariables(reportingTaskNode);
+
+ HashSet<ReportingTaskNode> reportingTaskNodes = new HashSet<>();
+ reportingTaskNodes.add(reportingTaskNode);
+
+ when(mockFlowController.getAllReportingTasks()).thenReturn(reportingTaskNodes);
+
+ if (validationErrors) {
+ ValidationResult validationResult = new ValidationResult.Builder()
+ .input("input")
+ .subject("subject")
+ .explanation("is not valid")
+ .build();
+
+ ValidationResult validationResult2 = new ValidationResult.Builder()
+ .input("input2")
+ .subject("subject2")
+ .explanation("is not valid too")
+ .build();
+
+ List<ValidationResult> validationResultList = new ArrayList<>();
+ validationResultList.add(validationResult);
+ validationResultList.add(validationResult2);
+
+ when(reportingTaskNode.getValidationErrors()).thenReturn(validationResultList);
+ } else {
+ when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST);
+ }
+ }
+
+ private void populateConnection() {
+ ConnectionStatus connectionStatus = new ConnectionStatus();
+ connectionStatus.setQueuedBytes(100);
+ connectionStatus.setId("connectionId");
+ connectionStatus.setName("connectionId");
+ connectionStatus.setQueuedCount(10);
+ connectionStatus.setInputCount(1);
+ connectionStatus.setInputBytes(2);
+ connectionStatus.setOutputCount(3);
+ connectionStatus.setOutputBytes(4);
+
+ Collection<ConnectionStatus> statusCollection = new ArrayList<>();
+ statusCollection.add(connectionStatus);
+
+ when(rootGroupStatus.getConnectionStatus()).thenReturn(statusCollection);
+ }
+
+ private void populateProcessor(boolean validationErrors, boolean addBulletins) {
+ if (addBulletins) {
+ addBulletins("Bulletin message", "UpdateAttributeProcessorId");
+ }
+
+ ProcessorStatus processorStatus = new ProcessorStatus();
+ processorStatus.setType("org.apache.nifi.processors.attributes.UpdateAttribute");
+ processorStatus.setId("UpdateAttributeProcessorId");
+ processorStatus.setName("UpdateAttributeProcessorId");
+ processorStatus.setRunStatus(RunStatus.Stopped);
+ processorStatus.setActiveThreadCount(1);
+ processorStatus.setFlowFilesReceived(2);
+ processorStatus.setBytesRead(3);
+ processorStatus.setBytesWritten(4);
+ processorStatus.setFlowFilesSent(5);
+ processorStatus.setInvocations(6);
+ processorStatus.setProcessingNanos(7);
+
+ Collection<ProcessorStatus> statusCollection = new ArrayList<>();
+ statusCollection.add(processorStatus);
+
+ mockProcessorEmptyValidation(processorStatus.getId(), processGroup);
+ when(rootGroupStatus.getProcessorStatus()).thenReturn(statusCollection);
+
+ ProcessorNode processorNode = mock(ProcessorNode.class);
+ when(processGroup.getProcessor(processorStatus.getId())).thenReturn(processorNode);
+
+ if (validationErrors) {
+ ValidationResult validationResult = new ValidationResult.Builder()
+ .input("input")
+ .subject("subject")
+ .explanation("is not valid")
+ .build();
+
+ ValidationResult validationResult2 = new ValidationResult.Builder()
+ .input("input2")
+ .subject("subject2")
+ .explanation("is not valid too")
+ .build();
+
+ List<ValidationResult> validationResultList = new ArrayList<>();
+ validationResultList.add(validationResult);
+ validationResultList.add(validationResult2);
+
+ when(processorNode.getValidationErrors()).thenReturn(validationResultList);
+ } else {
+ when(processorNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST);
+ }
+ }
+
+ private void populateRemoteProcessGroup(boolean addBulletins, boolean addAuthIssues) {
+ when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup);
+
+ RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
+ when(processGroup.getRemoteProcessGroup(any())).thenReturn(remoteProcessGroup);
+
+ RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
+ when(remoteGroupPort.getName()).thenReturn("inputPort");
+ when(remoteGroupPort.getTargetExists()).thenReturn(true);
+ when(remoteGroupPort.isTargetRunning()).thenReturn(false);
+
+ when(remoteProcessGroup.getInputPorts()).thenReturn(Collections.singleton(remoteGroupPort));
+
+ remoteGroupPort = mock(RemoteGroupPort.class);
+ when(remoteGroupPort.getName()).thenReturn("outputPort");
+ when(remoteGroupPort.getTargetExists()).thenReturn(true);
+ when(remoteGroupPort.isTargetRunning()).thenReturn(false);
+
+ when(remoteProcessGroup.getOutputPorts()).thenReturn(Collections.singleton(remoteGroupPort));
+
+ RemoteProcessGroupStatus remoteProcessGroupStatus = new RemoteProcessGroupStatus();
+ addRemoteProcessGroupStatus(remoteProcessGroupStatus);
+ if (addBulletins) {
+ addBulletins("Bulletin message", remoteProcessGroupStatus.getId());
+ }
+ when(rootGroupStatus.getRemoteProcessGroupStatus()).thenReturn(Collections.singletonList(remoteProcessGroupStatus));
+ }
+
+
+ private void setRootGroupStatusVariables() {
+ when(rootGroupStatus.getQueuedContentSize()).thenReturn(1L);
+ when(rootGroupStatus.getQueuedCount()).thenReturn(2);
+ when(rootGroupStatus.getActiveThreadCount()).thenReturn(3);
+ when(rootGroupStatus.getBytesRead()).thenReturn(1L);
+ when(rootGroupStatus.getBytesWritten()).thenReturn(2L);
+ when(rootGroupStatus.getBytesSent()).thenReturn(3L);
+ when(rootGroupStatus.getFlowFilesSent()).thenReturn(4);
+ when(rootGroupStatus.getBytesTransferred()).thenReturn(5L);
+ when(rootGroupStatus.getFlowFilesTransferred()).thenReturn(6);
+ when(rootGroupStatus.getBytesReceived()).thenReturn(7L);
+ when(rootGroupStatus.getFlowFilesReceived()).thenReturn(8);
+ }
+
+ private void addGarbageCollectionToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ Map<String, GarbageCollection> garbageCollectionMap = new HashMap<>();
+
+ GarbageCollection garbageCollection1 = new GarbageCollection();
+ garbageCollection1.setCollectionCount(1);
+ garbageCollection1.setCollectionTime(10);
+ garbageCollection1.setName("garbage 1");
+ garbageCollectionMap.put(garbageCollection1.getName(), garbageCollection1);
+
+ systemDiagnostics.setGarbageCollection(garbageCollectionMap);
+ }
+
+ private void addContentRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ Map<String, StorageUsage> stringStorageUsageMap = new HashMap<>();
+
+ StorageUsage repoUsage1 = new StorageUsage();
+ repoUsage1.setFreeSpace(30);
+ repoUsage1.setTotalSpace(100);
+ repoUsage1.setIdentifier("Content repo1");
+ stringStorageUsageMap.put(repoUsage1.getIdentifier(), repoUsage1);
+
+ systemDiagnostics.setContentRepositoryStorageUsage(stringStorageUsageMap);
+ }
+
+ private void addFlowFileRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ StorageUsage repoUsage = new StorageUsage();
+ repoUsage.setFreeSpace(30);
+ repoUsage.setTotalSpace(100);
+ repoUsage.setIdentifier("FlowFile repo");
+ systemDiagnostics.setFlowFileRepositoryStorageUsage(repoUsage);
+ }
+
+ private void addHeapSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ systemDiagnostics.setMaxHeap(5);
+ systemDiagnostics.setTotalHeap(3);
+ systemDiagnostics.setUsedHeap(2);
+ systemDiagnostics.setMaxNonHeap(9);
+ systemDiagnostics.setTotalNonHeap(8);
+ systemDiagnostics.setUsedNonHeap(6);
+ }
+
+ private void addProcessorInfoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
+ systemDiagnostics.setProcessorLoadAverage(80.9);
+ systemDiagnostics.setAvailableProcessors(5);
+ }
+
+ private void mockProcessorEmptyValidation(String id, ProcessGroup processGroup) {
+ ProcessorNode processorNode = mock(ProcessorNode.class);
+ when(processGroup.getProcessor(id)).thenReturn(processorNode);
+ when(processorNode.getValidationErrors()).thenReturn(Collections.emptyList());
+ }
+
+ private void addControllerServiceHealth(ControllerServiceNode controllerServiceNode) {
+ when(controllerServiceNode.getName()).thenReturn("mockControllerService");
+ when(controllerServiceNode.getIdentifier()).thenReturn("mockControllerService");
+ when(controllerServiceNode.getState()).thenReturn(ControllerServiceState.ENABLED);
+ when(controllerServiceNode.getValidationErrors()).thenReturn(Collections.emptyList());
+ }
+
+ private void addReportingTaskNodeVariables(ReportingTaskNode reportingTaskNode) {
+ when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.emptyList());
+ when(reportingTaskNode.getActiveThreadCount()).thenReturn(1);
+ when(reportingTaskNode.getScheduledState()).thenReturn(ScheduledState.RUNNING);
+ when(reportingTaskNode.getIdentifier()).thenReturn("ReportProvenance");
+ when(reportingTaskNode.getName()).thenReturn("ReportProvenance");
+
+ }
+
+ private void addRemoteProcessGroupStatus(RemoteProcessGroupStatus remoteProcessGroupStatus) {
+ remoteProcessGroupStatus.setName("rpg1");
+ remoteProcessGroupStatus.setId("rpg1");
+ remoteProcessGroupStatus.setTransmissionStatus(TransmissionStatus.Transmitting);
+ remoteProcessGroupStatus.setActiveRemotePortCount(1);
+ remoteProcessGroupStatus.setInactiveRemotePortCount(2);
+
+ remoteProcessGroupStatus.setActiveThreadCount(3);
+ remoteProcessGroupStatus.setSentContentSize(4L);
+ remoteProcessGroupStatus.setSentCount(5);
+ }
+
+ private void addBulletins(String message, String sourceId) {
+ Bulletin bulletin = mock(Bulletin.class);
+ when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L));
+ when(bulletin.getMessage()).thenReturn(message);
+
+ List<Bulletin> bulletinList = new ArrayList<>();
+ bulletinList.add(bulletin);
+
+ BulletinQueryAnswer bulletinQueryAnswer = new BulletinQueryAnswer(sourceId, bulletinList);
+ when(bulletinRepo.findBulletins(anyObject())).then(bulletinQueryAnswer);
+ }
+
+ private void addValidationErrors(ConfiguredComponent connectable) {
+ ValidationResult validationResult = new ValidationResult.Builder()
+ .input("input")
+ .subject("subject")
+ .explanation("is not valid")
+ .build();
+
+ ValidationResult validationResult2 = new ValidationResult.Builder()
+ .input("input2")
+ .subject("subject2")
+ .explanation("is not valid too")
+ .build();
+
+ List<ValidationResult> validationResultList = new ArrayList<>();
+ validationResultList.add(validationResult);
+ validationResultList.add(validationResult2);
+ when(connectable.getValidationErrors()).thenReturn(validationResultList);
+ }
+
+ private class BulletinQueryAnswer implements Answer {
+
+ final List<Bulletin> bulletinList;
+ String idToMatch = "";
+
+ private BulletinQueryAnswer(String idToMatch, List<Bulletin> bulletinList) {
+ this.idToMatch = idToMatch;
+ this.bulletinList = bulletinList;
+ }
+
+ @Override
+ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
+ BulletinQuery bulletinQuery = (BulletinQuery) invocationOnMock.getArguments()[0];
+ if (idToMatch.equals(bulletinQuery.getSourceIdPattern().toString())) {
+ return bulletinList;
+ }
+ return Collections.emptyList();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi-minifi/blob/b059afef/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
----------------------------------------------------------------------
diff --git a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java b/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
deleted file mode 100644
index ee2f115..0000000
--- a/minifi-nar-bundles/minifi-framework-bundle/minifi-framework/minifi-framework-core/src/test/java/org/apache/nifi/minifi/status/TestStatusConfigReporter.java
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.minifi.status;
-
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.controller.ConfiguredComponent;
-import org.apache.nifi.controller.FlowController;
-import org.apache.nifi.controller.ProcessorNode;
-import org.apache.nifi.controller.ReportingTaskNode;
-import org.apache.nifi.controller.ScheduledState;
-import org.apache.nifi.controller.service.ControllerServiceNode;
-import org.apache.nifi.controller.service.ControllerServiceState;
-import org.apache.nifi.controller.status.ConnectionStatus;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.controller.status.ProcessorStatus;
-import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
-import org.apache.nifi.controller.status.RunStatus;
-import org.apache.nifi.controller.status.TransmissionStatus;
-import org.apache.nifi.diagnostics.GarbageCollection;
-import org.apache.nifi.diagnostics.StorageUsage;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.groups.ProcessGroup;
-import org.apache.nifi.groups.RemoteProcessGroup;
-import org.apache.nifi.minifi.commons.status.FlowStatusReport;
-import org.apache.nifi.remote.RemoteGroupPort;
-import org.apache.nifi.reporting.Bulletin;
-import org.apache.nifi.reporting.BulletinQuery;
-import org.apache.nifi.reporting.BulletinRepository;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addConnectionStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addControllerServiceStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addExpectedRemoteProcessGroupStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addInstanceStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addProcessorStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addReportingTaskStatus;
-import static org.apache.nifi.minifi.commons.status.util.StatusReportPopulator.addSystemDiagnosticStatus;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestStatusConfigReporter {
- private FlowController mockFlowController;
- private ProcessGroupStatus rootGroupStatus;
- private BulletinRepository bulletinRepo;
- private ProcessGroup processGroup;
-
- @Before
- public void setup() {
- mockFlowController = mock(FlowController.class);
- rootGroupStatus = mock(ProcessGroupStatus.class);
- bulletinRepo = mock(BulletinRepository.class);
- processGroup = mock(ProcessGroup.class);
-
- when(mockFlowController.getRootGroupId()).thenReturn("root");
- when(mockFlowController.getGroupStatus("root")).thenReturn(rootGroupStatus);
- when(mockFlowController.getControllerStatus()).thenReturn(rootGroupStatus);
- when(mockFlowController.getBulletinRepository()).thenReturn(bulletinRepo);
- when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup);
- }
-
- @Test
- public void processorStatusHealth() throws Exception {
- populateProcessor(false, false);
-
- String statusRequest = "processor:all:health";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addProcessorStatus(expected, true, false, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void processorStatusWithValidationErrors() throws Exception {
- populateProcessor(true, false);
-
- String statusRequest = "processor:all:health";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addProcessorStatus(expected, true, true, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void processorStatusAll() throws Exception {
- populateProcessor(true, true);
-
- String statusRequest = "processor:all:health, stats, bulletins";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addProcessorStatus(expected, true, true, true, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void connectionStatusHealth() throws Exception {
- populateConnection();
-
- String statusRequest = "connection:all:health";
- FlowStatusReport status = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addConnectionStatus(expected, true, false);
-
- assertEquals(expected, status);
- }
-
-
- @Test
- public void connectionStatusAll() throws Exception {
- populateConnection();
-
- String statusRequest = "connection:all:health, stats";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
- addConnectionStatus(expected, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void connectionAndProcessorStatusHealth() throws Exception {
-
- populateConnection();
-
- populateProcessor(false, false);
-
- String statusRequest = "connection:connectionId:health; processor:UpdateAttributeProcessorId:health";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- addConnectionStatus(expected, true, false);
-
- addProcessorStatus(expected, true, false, false, false, false);
-
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void provenanceReportingTaskStatusHealth() throws Exception {
- populateReportingTask(false, false);
-
- String statusRequest = "provenanceReporting:health";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
- addReportingTaskStatus(expected, true, false, false, false);
-
- assertEquals(expected, actual);
- }
-
-
- @Test
- public void provenanceReportingTaskStatusBulletins() throws Exception {
- populateReportingTask(true, false);
-
- String statusRequest = "provenanceReporting:bulletins";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addReportingTaskStatus(expected, false, false, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void provenanceReportingTaskStatusAll() throws Exception {
- populateReportingTask(true, true);
-
- String statusRequest = "provenanceReporting:health,bulletins";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addReportingTaskStatus(expected, true, true, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void systemDiagnosticHeap() throws Exception {
- populateSystemDiagnostics();
-
- String statusRequest = "systemDiagnostics:heap";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addSystemDiagnosticStatus(expected, true, false, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void systemDiagnosticProcessorStats() throws Exception {
- populateSystemDiagnostics();
-
- String statusRequest = "systemDiagnostics:processorStats";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addSystemDiagnosticStatus(expected, false, true, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void systemDiagnosticFlowFileRepo() throws Exception {
- populateSystemDiagnostics();
-
- String statusRequest = "systemDiagnostics:flowfilerepositoryusage";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addSystemDiagnosticStatus(expected, false, false, true, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void systemDiagnosticContentRepo() throws Exception {
- populateSystemDiagnostics();
-
- String statusRequest = "systemDiagnostics:contentrepositoryusage";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addSystemDiagnosticStatus(expected, false, false, false, true, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void systemDiagnosticGarbageCollection() throws Exception {
- populateSystemDiagnostics();
-
- String statusRequest = "systemDiagnostics:garbagecollection";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addSystemDiagnosticStatus(expected, false, false, false, false, true);
-
- assertEquals(expected, actual);
- }
-
-
- @Test
- public void systemDiagnosticAll() throws Exception {
- populateSystemDiagnostics();
-
- String statusRequest = "systemDiagnostics:garbagecollection, heap, processorstats, contentrepositoryusage, flowfilerepositoryusage";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addSystemDiagnosticStatus(expected, true, true, true, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void instanceStatusHealth() throws Exception {
- populateInstance(false);
-
- String statusRequest = "instance:health";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
- addInstanceStatus(expected, true, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void instanceStatusBulletins() throws Exception {
- populateInstance(true);
-
- String statusRequest = "instance:bulletins";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addInstanceStatus(expected, false, false, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void instanceStatusStats() throws Exception {
- populateInstance(false);
-
- String statusRequest = "instance:stats";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addInstanceStatus(expected, false, true, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void instanceStatusAll() throws Exception {
- populateInstance(true);
-
- String statusRequest = "instance:stats, bulletins, health";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addInstanceStatus(expected, true, true, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void controllerServiceStatusHealth() throws Exception {
- populateControllerService(false, false);
-
- String statusRequest = "controllerServices:health";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addControllerServiceStatus(expected, true, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void controllerServiceStatusBulletins() throws Exception {
- populateControllerService(false, true);
-
- String statusRequest = "controllerServices:bulletins";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addControllerServiceStatus(expected, false, false, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void controllerServiceStatusAll() throws Exception {
- populateControllerService(true, true);
-
- String statusRequest = "controllerServices:bulletins, health";
-
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addControllerServiceStatus(expected, true, true, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void remoteProcessGroupStatusHealth() throws Exception {
- populateRemoteProcessGroup(false, false);
-
- String statusRequest = "remoteProcessGroup:all:health";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addExpectedRemoteProcessGroupStatus(expected, true, false, false, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void remoteProcessGroupStatusBulletins() throws Exception {
- populateRemoteProcessGroup(true, false);
-
- String statusRequest = "remoteProcessGroup:all:bulletins";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addExpectedRemoteProcessGroupStatus(expected, false, false, false, false, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void remoteProcessGroupStatusInputPorts() throws Exception {
- populateRemoteProcessGroup(false, false);
-
- String statusRequest = "remoteProcessGroup:all:inputPorts";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addExpectedRemoteProcessGroupStatus(expected, false, true, false, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void remoteProcessGroupStatusOutputPorts() throws Exception {
- populateRemoteProcessGroup(false, false);
-
- String statusRequest = "remoteProcessGroup:all:outputPorts";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addExpectedRemoteProcessGroupStatus(expected, false, false, true, false, false, false);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void remoteProcessGroupStatusStats() throws Exception {
- populateRemoteProcessGroup(false, false);
-
- String statusRequest = "remoteProcessGroup:all:stats";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addExpectedRemoteProcessGroupStatus(expected, false, false, false, true, false, false);
-
- assertEquals(expected, actual);
- }
-
-
- @Test
- public void remoteProcessGroupStatusAll() throws Exception {
- populateRemoteProcessGroup(true, true);
-
- String statusRequest = "remoteProcessGroup:all:health, bulletins, inputPorts, outputPorts, stats";
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, true);
-
- assertEquals(expected, actual);
- }
-
- @Test
- public void statusEverything() throws Exception {
- when(bulletinRepo.findBulletins(anyObject())).thenReturn(Collections.emptyList());
-
- populateControllerService(true, false);
- populateInstance(true);
- populateSystemDiagnostics();
- populateReportingTask(false, true);
- populateConnection();
- populateProcessor(true, false);
- populateRemoteProcessGroup(false, true);
-
- String statusRequest = "controllerServices:bulletins,health; processor:all:health,stats,bulletins; instance:bulletins,health,stats ; systemDiagnostics:garbagecollection, heap, " +
- "processorstats, contentrepositoryusage, flowfilerepositoryusage; connection:all:health,stats; provenanceReporting:health,bulletins; remoteProcessGroup:all:health, " +
- "bulletins, inputPorts, outputPorts, stats";
-
- FlowStatusReport actual = StatusConfigReporter.getStatus(mockFlowController, statusRequest, LoggerFactory.getLogger(TestStatusConfigReporter.class));
-
- FlowStatusReport expected = new FlowStatusReport();
- expected.setErrorsGeneratingReport(Collections.EMPTY_LIST);
-
- addControllerServiceStatus(expected, true, true, true, false);
- addInstanceStatus(expected, true, true, true, true);
- addSystemDiagnosticStatus(expected, true, true, true, true, true);
- addReportingTaskStatus(expected, true, true, true, false);
- addConnectionStatus(expected, true, true);
- addProcessorStatus(expected, true, true, true, true, false);
- addExpectedRemoteProcessGroupStatus(expected, true, true, true, true, true, false);
-
- assertEquals(expected, actual);
- }
-
-
- /***************************
- * Populator methods
- *************************/
-
- private void addBulletinsToInstance() {
- Bulletin bulletin = mock(Bulletin.class);
- when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L));
- when(bulletin.getMessage()).thenReturn("Bulletin message");
-
- List<Bulletin> bulletinList = new ArrayList<>();
- bulletinList.add(bulletin);
-
- when(bulletinRepo.findBulletinsForController()).thenReturn(bulletinList);
- }
-
- private void populateSystemDiagnostics() {
- SystemDiagnostics systemDiagnostics = new SystemDiagnostics();
- addGarbageCollectionToSystemDiagnostics(systemDiagnostics);
- addHeapSystemDiagnostics(systemDiagnostics);
- addContentRepoToSystemDiagnostics(systemDiagnostics);
- addFlowFileRepoToSystemDiagnostics(systemDiagnostics);
- addProcessorInfoToSystemDiagnostics(systemDiagnostics);
- when(mockFlowController.getSystemDiagnostics()).thenReturn(systemDiagnostics);
- }
-
- private void populateControllerService(boolean validationErrors, boolean addBulletins) {
- ControllerServiceNode controllerServiceNode = mock(ControllerServiceNode.class);
- addControllerServiceHealth(controllerServiceNode);
- if (validationErrors) {
- addValidationErrors(controllerServiceNode);
- }
-
- if (addBulletins) {
- addBulletins("Bulletin message", controllerServiceNode.getIdentifier());
- }
- HashSet<ControllerServiceNode> controllerServiceNodes = new HashSet<>();
- controllerServiceNodes.add(controllerServiceNode);
- when(mockFlowController.getAllControllerServices()).thenReturn(controllerServiceNodes);
- }
-
- private void populateInstance(boolean addBulletins) {
- setRootGroupStatusVariables();
- if (addBulletins) {
- addBulletinsToInstance();
- }
- }
-
- private void populateReportingTask(boolean addBulletins, boolean validationErrors) {
- if (addBulletins) {
- addBulletins("Bulletin message", "ReportProvenance");
- }
-
- ReportingTaskNode reportingTaskNode = mock(ReportingTaskNode.class);
- addReportingTaskNodeVariables(reportingTaskNode);
-
- HashSet<ReportingTaskNode> reportingTaskNodes = new HashSet<>();
- reportingTaskNodes.add(reportingTaskNode);
-
- when(mockFlowController.getAllReportingTasks()).thenReturn(reportingTaskNodes);
-
- if (validationErrors) {
- ValidationResult validationResult = new ValidationResult.Builder()
- .input("input")
- .subject("subject")
- .explanation("is not valid")
- .build();
-
- ValidationResult validationResult2 = new ValidationResult.Builder()
- .input("input2")
- .subject("subject2")
- .explanation("is not valid too")
- .build();
-
- List<ValidationResult> validationResultList = new ArrayList<>();
- validationResultList.add(validationResult);
- validationResultList.add(validationResult2);
-
- when(reportingTaskNode.getValidationErrors()).thenReturn(validationResultList);
- } else {
- when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST);
- }
- }
-
- private void populateConnection() {
- ConnectionStatus connectionStatus = new ConnectionStatus();
- connectionStatus.setQueuedBytes(100);
- connectionStatus.setId("connectionId");
- connectionStatus.setName("connectionId");
- connectionStatus.setQueuedCount(10);
- connectionStatus.setInputCount(1);
- connectionStatus.setInputBytes(2);
- connectionStatus.setOutputCount(3);
- connectionStatus.setOutputBytes(4);
-
- Collection<ConnectionStatus> statusCollection = new ArrayList<>();
- statusCollection.add(connectionStatus);
-
- when(rootGroupStatus.getConnectionStatus()).thenReturn(statusCollection);
- }
-
- private void populateProcessor(boolean validationErrors, boolean addBulletins) {
- if (addBulletins) {
- addBulletins("Bulletin message", "UpdateAttributeProcessorId");
- }
-
- ProcessorStatus processorStatus = new ProcessorStatus();
- processorStatus.setType("org.apache.nifi.processors.attributes.UpdateAttribute");
- processorStatus.setId("UpdateAttributeProcessorId");
- processorStatus.setName("UpdateAttributeProcessorId");
- processorStatus.setRunStatus(RunStatus.Stopped);
- processorStatus.setActiveThreadCount(1);
- processorStatus.setFlowFilesReceived(2);
- processorStatus.setBytesRead(3);
- processorStatus.setBytesWritten(4);
- processorStatus.setFlowFilesSent(5);
- processorStatus.setInvocations(6);
- processorStatus.setProcessingNanos(7);
-
- Collection<ProcessorStatus> statusCollection = new ArrayList<>();
- statusCollection.add(processorStatus);
-
- mockProcessorEmptyValidation(processorStatus.getId(), processGroup);
- when(rootGroupStatus.getProcessorStatus()).thenReturn(statusCollection);
-
- ProcessorNode processorNode = mock(ProcessorNode.class);
- when(processGroup.getProcessor(processorStatus.getId())).thenReturn(processorNode);
-
- if (validationErrors) {
- ValidationResult validationResult = new ValidationResult.Builder()
- .input("input")
- .subject("subject")
- .explanation("is not valid")
- .build();
-
- ValidationResult validationResult2 = new ValidationResult.Builder()
- .input("input2")
- .subject("subject2")
- .explanation("is not valid too")
- .build();
-
- List<ValidationResult> validationResultList = new ArrayList<>();
- validationResultList.add(validationResult);
- validationResultList.add(validationResult2);
-
- when(processorNode.getValidationErrors()).thenReturn(validationResultList);
- } else {
- when(processorNode.getValidationErrors()).thenReturn(Collections.EMPTY_LIST);
- }
- }
-
- private void populateRemoteProcessGroup(boolean addBulletins, boolean addAuthIssues) {
- when(mockFlowController.getGroup(mockFlowController.getRootGroupId())).thenReturn(processGroup);
-
- RemoteProcessGroup remoteProcessGroup = mock(RemoteProcessGroup.class);
- when(processGroup.getRemoteProcessGroup(any())).thenReturn(remoteProcessGroup);
-
- RemoteGroupPort remoteGroupPort = mock(RemoteGroupPort.class);
- when(remoteGroupPort.getName()).thenReturn("inputPort");
- when(remoteGroupPort.getTargetExists()).thenReturn(true);
- when(remoteGroupPort.isTargetRunning()).thenReturn(false);
-
- when(remoteProcessGroup.getInputPorts()).thenReturn(Collections.singleton(remoteGroupPort));
-
- remoteGroupPort = mock(RemoteGroupPort.class);
- when(remoteGroupPort.getName()).thenReturn("outputPort");
- when(remoteGroupPort.getTargetExists()).thenReturn(true);
- when(remoteGroupPort.isTargetRunning()).thenReturn(false);
-
- when(remoteProcessGroup.getOutputPorts()).thenReturn(Collections.singleton(remoteGroupPort));
-
- RemoteProcessGroupStatus remoteProcessGroupStatus = new RemoteProcessGroupStatus();
- addRemoteProcessGroupStatus(remoteProcessGroupStatus);
- if (addBulletins) {
- addBulletins("Bulletin message", remoteProcessGroupStatus.getId());
- }
- when(rootGroupStatus.getRemoteProcessGroupStatus()).thenReturn(Collections.singletonList(remoteProcessGroupStatus));
- }
-
-
- private void setRootGroupStatusVariables() {
- when(rootGroupStatus.getQueuedContentSize()).thenReturn(1L);
- when(rootGroupStatus.getQueuedCount()).thenReturn(2);
- when(rootGroupStatus.getActiveThreadCount()).thenReturn(3);
- when(rootGroupStatus.getBytesRead()).thenReturn(1L);
- when(rootGroupStatus.getBytesWritten()).thenReturn(2L);
- when(rootGroupStatus.getBytesSent()).thenReturn(3L);
- when(rootGroupStatus.getFlowFilesSent()).thenReturn(4);
- when(rootGroupStatus.getBytesTransferred()).thenReturn(5L);
- when(rootGroupStatus.getFlowFilesTransferred()).thenReturn(6);
- when(rootGroupStatus.getBytesReceived()).thenReturn(7L);
- when(rootGroupStatus.getFlowFilesReceived()).thenReturn(8);
- }
-
- private void addGarbageCollectionToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
- Map<String, GarbageCollection> garbageCollectionMap = new HashMap<>();
-
- GarbageCollection garbageCollection1 = new GarbageCollection();
- garbageCollection1.setCollectionCount(1);
- garbageCollection1.setCollectionTime(10);
- garbageCollection1.setName("garbage 1");
- garbageCollectionMap.put(garbageCollection1.getName(), garbageCollection1);
-
- systemDiagnostics.setGarbageCollection(garbageCollectionMap);
- }
-
- private void addContentRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
- Map<String, StorageUsage> stringStorageUsageMap = new HashMap<>();
-
- StorageUsage repoUsage1 = new StorageUsage();
- repoUsage1.setFreeSpace(30);
- repoUsage1.setTotalSpace(100);
- repoUsage1.setIdentifier("Content repo1");
- stringStorageUsageMap.put(repoUsage1.getIdentifier(), repoUsage1);
-
- systemDiagnostics.setContentRepositoryStorageUsage(stringStorageUsageMap);
- }
-
- private void addFlowFileRepoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
- StorageUsage repoUsage = new StorageUsage();
- repoUsage.setFreeSpace(30);
- repoUsage.setTotalSpace(100);
- repoUsage.setIdentifier("FlowFile repo");
- systemDiagnostics.setFlowFileRepositoryStorageUsage(repoUsage);
- }
-
- private void addHeapSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
- systemDiagnostics.setMaxHeap(5);
- systemDiagnostics.setTotalHeap(3);
- systemDiagnostics.setUsedHeap(2);
- systemDiagnostics.setMaxNonHeap(9);
- systemDiagnostics.setTotalNonHeap(8);
- systemDiagnostics.setUsedNonHeap(6);
- }
-
- private void addProcessorInfoToSystemDiagnostics(SystemDiagnostics systemDiagnostics) {
- systemDiagnostics.setProcessorLoadAverage(80.9);
- systemDiagnostics.setAvailableProcessors(5);
- }
-
- private void mockProcessorEmptyValidation(String id, ProcessGroup processGroup) {
- ProcessorNode processorNode = mock(ProcessorNode.class);
- when(processGroup.getProcessor(id)).thenReturn(processorNode);
- when(processorNode.getValidationErrors()).thenReturn(Collections.emptyList());
- }
-
- private void addControllerServiceHealth(ControllerServiceNode controllerServiceNode) {
- when(controllerServiceNode.getName()).thenReturn("mockControllerService");
- when(controllerServiceNode.getIdentifier()).thenReturn("mockControllerService");
- when(controllerServiceNode.getState()).thenReturn(ControllerServiceState.ENABLED);
- when(controllerServiceNode.getValidationErrors()).thenReturn(Collections.emptyList());
- }
-
- private void addReportingTaskNodeVariables(ReportingTaskNode reportingTaskNode) {
- when(reportingTaskNode.getValidationErrors()).thenReturn(Collections.emptyList());
- when(reportingTaskNode.getActiveThreadCount()).thenReturn(1);
- when(reportingTaskNode.getScheduledState()).thenReturn(ScheduledState.RUNNING);
- when(reportingTaskNode.getIdentifier()).thenReturn("ReportProvenance");
- when(reportingTaskNode.getName()).thenReturn("ReportProvenance");
-
- }
-
- private void addRemoteProcessGroupStatus(RemoteProcessGroupStatus remoteProcessGroupStatus) {
- remoteProcessGroupStatus.setName("rpg1");
- remoteProcessGroupStatus.setId("rpg1");
- remoteProcessGroupStatus.setTransmissionStatus(TransmissionStatus.Transmitting);
- remoteProcessGroupStatus.setActiveRemotePortCount(1);
- remoteProcessGroupStatus.setInactiveRemotePortCount(2);
-
- remoteProcessGroupStatus.setActiveThreadCount(3);
- remoteProcessGroupStatus.setSentContentSize(4L);
- remoteProcessGroupStatus.setSentCount(5);
- }
-
- private void addBulletins(String message, String sourceId) {
- Bulletin bulletin = mock(Bulletin.class);
- when(bulletin.getTimestamp()).thenReturn(new Date(1464019245000L));
- when(bulletin.getMessage()).thenReturn(message);
-
- List<Bulletin> bulletinList = new ArrayList<>();
- bulletinList.add(bulletin);
-
- BulletinQueryAnswer bulletinQueryAnswer = new BulletinQueryAnswer(sourceId, bulletinList);
- when(bulletinRepo.findBulletins(anyObject())).then(bulletinQueryAnswer);
- }
-
- private void addValidationErrors(ConfiguredComponent connectable) {
- ValidationResult validationResult = new ValidationResult.Builder()
- .input("input")
- .subject("subject")
- .explanation("is not valid")
- .build();
-
- ValidationResult validationResult2 = new ValidationResult.Builder()
- .input("input2")
- .subject("subject2")
- .explanation("is not valid too")
- .build();
-
- List<ValidationResult> validationResultList = new ArrayList<>();
- validationResultList.add(validationResult);
- validationResultList.add(validationResult2);
- when(connectable.getValidationErrors()).thenReturn(validationResultList);
- }
-
- private class BulletinQueryAnswer implements Answer {
-
- final List<Bulletin> bulletinList;
- String idToMatch = "";
-
- private BulletinQueryAnswer(String idToMatch, List<Bulletin> bulletinList) {
- this.idToMatch = idToMatch;
- this.bulletinList = bulletinList;
- }
-
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- BulletinQuery bulletinQuery = (BulletinQuery) invocationOnMock.getArguments()[0];
- if (idToMatch.equals(bulletinQuery.getSourceIdPattern().toString())) {
- return bulletinList;
- }
- return Collections.emptyList();
- }
- }
-}