You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:13:20 UTC
[30/49] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index ca68725..3486875 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -257,7 +257,7 @@ public class TestStandardProcessSession {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
}
-
+
private void assertDisabled(final InputStream inputStream) {
try {
inputStream.read();
@@ -289,8 +289,8 @@ public class TestStandardProcessSession {
} catch (final Exception ex) {
Assert.assertEquals(FlowFileAccessException.class, ex.getClass());
}
- }
-
+ }
+
@Test
public void testAppendAfterSessionClosesStream() throws IOException {
final ContentClaim claim = contentRepo.create(false);
@@ -355,7 +355,7 @@ public class TestStandardProcessSession {
});
assertDisabled(inputStreamHolder.get());
assertDisabled(outputStreamHolder.get());
- }
+ }
@Test
public void testWriteAfterSessionClosesStream() throws IOException {
@@ -426,7 +426,6 @@ public class TestStandardProcessSession {
assertEquals(0, provenanceRepo.getEvents(0L, 100000).size());
}
-
@Test
public void testProvenanceEventsEmittedForForkIfNotRemoved() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
@@ -466,59 +465,59 @@ public class TestStandardProcessSession {
@Test
public void testUpdateAttributesThenJoin() throws IOException {
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
- .entryDate(System.currentTimeMillis())
- .build();
-
+ .id(1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-111111111111")
+ .entryDate(System.currentTimeMillis())
+ .build();
+
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
- .id(2L)
- .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
- .entryDate(System.currentTimeMillis())
- .build();
-
+ .id(2L)
+ .addAttribute("uuid", "22222222-2222-2222-2222-222222222222")
+ .entryDate(System.currentTimeMillis())
+ .build();
+
flowFileQueue.put(flowFileRecord1);
flowFileQueue.put(flowFileRecord2);
-
+
FlowFile ff1 = session.get();
FlowFile ff2 = session.get();
ff1 = session.putAttribute(ff1, "index", "1");
ff2 = session.putAttribute(ff2, "index", "2");
-
+
final List<FlowFile> parents = new ArrayList<>(2);
parents.add(ff1);
parents.add(ff2);
-
+
final FlowFile child = session.create(parents);
-
+
final Relationship rel = new Relationship.Builder().name("A").build();
-
+
session.transfer(ff1, rel);
session.transfer(ff2, rel);
session.transfer(child, rel);
-
+
session.commit();
-
+
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 1000);
// We should have a JOIN and 2 ATTRIBUTE_MODIFIED's
assertEquals(3, events.size());
-
+
int joinCount = 0;
int ff1UpdateCount = 0;
int ff2UpdateCount = 0;
-
- for ( final ProvenanceEventRecord event : events ) {
+
+ for (final ProvenanceEventRecord event : events) {
switch (event.getEventType()) {
case JOIN:
assertEquals(child.getAttribute("uuid"), event.getFlowFileUuid());
joinCount++;
break;
case ATTRIBUTES_MODIFIED:
- if ( event.getFlowFileUuid().equals(ff1.getAttribute("uuid")) ) {
+ if (event.getFlowFileUuid().equals(ff1.getAttribute("uuid"))) {
ff1UpdateCount++;
- } else if ( event.getFlowFileUuid().equals(ff2.getAttribute("uuid")) ) {
+ } else if (event.getFlowFileUuid().equals(ff2.getAttribute("uuid"))) {
ff2UpdateCount++;
} else {
Assert.fail("Got ATTRIBUTE_MODIFIED for wrong FlowFile: " + event.getFlowFileUuid());
@@ -528,14 +527,14 @@ public class TestStandardProcessSession {
Assert.fail("Unexpected event type: " + event);
}
}
-
+
assertEquals(1, joinCount);
assertEquals(1, ff1UpdateCount);
assertEquals(1, ff2UpdateCount);
-
+
assertEquals(1, joinCount);
}
-
+
@Test
public void testForkOneToOneReported() throws IOException {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
@@ -845,34 +844,34 @@ public class TestStandardProcessSession {
@Test
public void testContentNotFoundExceptionThrownWhenUnableToReadDataOffsetTooLarge() {
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "container";
- }
-
- @Override
- public String getSection() {
- return "section";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- }).build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new ContentClaim() {
+ @Override
+ public int compareTo(ContentClaim arg0) {
+ return 0;
+ }
+
+ @Override
+ public String getId() {
+ return "0";
+ }
+
+ @Override
+ public String getContainer() {
+ return "container";
+ }
+
+ @Override
+ public String getSection() {
+ return "section";
+ }
+
+ @Override
+ public boolean isLossTolerant() {
+ return true;
+ }
+ }).build();
flowFileQueue.put(flowFileRecord);
FlowFile ff1 = session.get();
@@ -885,35 +884,35 @@ public class TestStandardProcessSession {
session.commit();
final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder()
- .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
- .entryDate(System.currentTimeMillis())
- .contentClaim(new ContentClaim() {
- @Override
- public int compareTo(ContentClaim arg0) {
- return 0;
- }
-
- @Override
- public String getId() {
- return "0";
- }
-
- @Override
- public String getContainer() {
- return "container";
- }
-
- @Override
- public String getSection() {
- return "section";
- }
-
- @Override
- public boolean isLossTolerant() {
- return true;
- }
- })
- .contentClaimOffset(1000L).size(1L).build();
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .contentClaim(new ContentClaim() {
+ @Override
+ public int compareTo(ContentClaim arg0) {
+ return 0;
+ }
+
+ @Override
+ public String getId() {
+ return "0";
+ }
+
+ @Override
+ public String getContainer() {
+ return "container";
+ }
+
+ @Override
+ public String getSection() {
+ return "section";
+ }
+
+ @Override
+ public boolean isLossTolerant() {
+ return true;
+ }
+ })
+ .contentClaimOffset(1000L).size(1L).build();
flowFileQueue.put(flowFileRecord2);
// attempt to read the data.
@@ -974,21 +973,20 @@ public class TestStandardProcessSession {
}
}
-
@Test
public void testCreateEmitted() throws IOException {
FlowFile newFlowFile = session.create();
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
-
+
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
-
+
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
}
-
+
@Test
public void testContentModifiedNotEmittedForCreate() throws IOException {
FlowFile newFlowFile = session.create();
@@ -999,23 +997,23 @@ public class TestStandardProcessSession {
});
session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
session.commit();
-
+
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
-
+
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
}
-
+
@Test
public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
- .build();
+ .id(1L)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+ .build();
this.flowFileQueue.put(flowFile);
-
+
FlowFile existingFlowFile = session.get();
existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() {
@Override
@@ -1025,38 +1023,36 @@ public class TestStandardProcessSession {
existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
session.commit();
-
+
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
-
+
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType());
}
-
+
@Test
public void testAttributesModifiedEmitted() throws IOException {
final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
- .id(1L)
- .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
- .build();
+ .id(1L)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+ .build();
this.flowFileQueue.put(flowFile);
-
+
FlowFile existingFlowFile = session.get();
existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
session.commit();
-
+
final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
assertFalse(events.isEmpty());
assertEquals(1, events.size());
-
+
final ProvenanceEventRecord event = events.get(0);
assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType());
}
-
-
-
+
private static class MockFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L);
@@ -1123,7 +1119,7 @@ public class TestStandardProcessSession {
@Override
public void shutdown() {
}
-
+
public Set<ContentClaim> getExistingClaims() {
final Set<ContentClaim> claims = new HashSet<>();
@@ -1146,7 +1142,7 @@ public class TestStandardProcessSession {
if (Files.exists(parent) == false) {
Files.createDirectories(parent);
}
- Files.createFile(getPath(claim));
+ Files.createFile(getPath(claim));
return claim;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
index 7fef706..acd9993 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/StandardControllerServiceProviderTest.java
@@ -27,45 +27,45 @@ import org.junit.BeforeClass;
import org.junit.Test;
public class StandardControllerServiceProviderTest {
-
+
private ControllerService proxied;
private ControllerService implementation;
-
+
@BeforeClass
public static void setupSuite() throws Exception {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, StandardFlowServiceTest.class.getResource("/conf/nifi.properties").getFile());
NiFiProperties properties = NiFiProperties.getInstance();
NarClassLoaders.load(properties);
- ExtensionManager.discoverExtensions();
+ ExtensionManager.discoverExtensions();
}
@Before
public void setup() throws Exception {
- String id = "id";
- String clazz = "org.apache.nifi.controller.service.util.TestControllerService";
- ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null);
- ControllerServiceNode node = provider.createControllerService(clazz,id,true);
- proxied = node.getProxiedControllerService();
- implementation = node.getControllerServiceImplementation();
+ String id = "id";
+ String clazz = "org.apache.nifi.controller.service.util.TestControllerService";
+ ControllerServiceProvider provider = new StandardControllerServiceProvider(null, null);
+ ControllerServiceNode node = provider.createControllerService(clazz, id, true);
+ proxied = node.getProxiedControllerService();
+ implementation = node.getControllerServiceImplementation();
}
-
- @Test (expected=UnsupportedOperationException.class)
- public void testCallProxiedOnPropertyModified() {
- proxied.onPropertyModified(null, "oldValue", "newValue");
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCallProxiedOnPropertyModified() {
+ proxied.onPropertyModified(null, "oldValue", "newValue");
}
-
+
@Test
- public void testCallImplementationOnPropertyModified() {
- implementation.onPropertyModified(null, "oldValue", "newValue");
+ public void testCallImplementationOnPropertyModified() {
+ implementation.onPropertyModified(null, "oldValue", "newValue");
}
-
- @Test (expected=UnsupportedOperationException.class)
- public void testCallProxiedInitialized() throws InitializationException {
- proxied.initialize(null);
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void testCallProxiedInitialized() throws InitializationException {
+ proxied.initialize(null);
}
-
+
@Test
- public void testCallImplementationInitialized() throws InitializationException {
- implementation.initialize(null);
+ public void testCallImplementationInitialized() throws InitializationException {
+ implementation.initialize(null);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
index 3dc1752..03aca7e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -54,7 +54,7 @@ public class TestStandardControllerServiceProvider {
return null;
}
}).when(scheduler).enableControllerService(Mockito.any(ControllerServiceNode.class));
-
+
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
@@ -64,55 +64,54 @@ public class TestStandardControllerServiceProvider {
return null;
}
}).when(scheduler).disableControllerService(Mockito.any(ControllerServiceNode.class));
-
+
return scheduler;
}
-
+
@Test
public void testDisableControllerService() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
-
+
final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false);
provider.enableControllerService(serviceNode);
provider.disableControllerService(serviceNode);
}
-
+
@Test
public void testEnableDisableWithReference() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
-
+
final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false);
final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false);
-
+
serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
-
+
try {
provider.enableControllerService(serviceNodeA);
Assert.fail("Was able to enable Service A but Service B is disabled.");
} catch (final IllegalStateException expected) {
}
-
+
provider.enableControllerService(serviceNodeB);
provider.enableControllerService(serviceNodeA);
-
+
try {
provider.disableControllerService(serviceNodeB);
Assert.fail("Was able to disable Service B but Service A is enabled and references B");
} catch (final IllegalStateException expected) {
}
-
+
provider.disableControllerService(serviceNodeA);
provider.disableControllerService(serviceNodeB);
}
-
-
+
@Test
public void testEnableReferencingServicesGraph() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
-
+
// build a graph of controller services with dependencies as such:
//
// A -> B -> D
@@ -125,31 +124,29 @@ public class TestStandardControllerServiceProvider {
// So we have to verify that if D is enabled, when we enable its referencing services,
// we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
// until B is first enabled so ensure that we enable B first.
-
final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
-
+
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
-
+
provider.enableControllerService(serviceNode4);
provider.enableReferencingServices(serviceNode4);
-
+
assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
}
-
-
+
@Test
public void testStartStopReferencingComponents() {
final ProcessScheduler scheduler = createScheduler();
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler, null);
-
+
// build a graph of reporting tasks and controller services with dependencies as such:
//
// Processor P1 -> A -> B -> D
@@ -162,12 +159,11 @@ public class TestStandardControllerServiceProvider {
// So we have to verify that if D is enabled, when we enable its referencing services,
// we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
// until B is first enabled so ensure that we enable B first.
-
final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
-
+
final ProcessGroup mockProcessGroup = Mockito.mock(ProcessGroup.class);
Mockito.doAnswer(new Answer<Object>() {
@Override
@@ -178,7 +174,7 @@ public class TestStandardControllerServiceProvider {
return null;
}
}).when(mockProcessGroup).startProcessor(Mockito.any(ProcessorNode.class));
-
+
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable {
@@ -188,36 +184,36 @@ public class TestStandardControllerServiceProvider {
return null;
}
}).when(mockProcessGroup).stopProcessor(Mockito.any(ProcessorNode.class));
-
+
final String id1 = UUID.randomUUID().toString();
final ProcessorNode procNodeA = new StandardProcessorNode(new DummyProcessor(), id1,
new StandardValidationContextFactory(provider), scheduler, provider);
procNodeA.getProcessor().initialize(new StandardProcessorInitializationContext(id1, null, provider));
procNodeA.setProperty(DummyProcessor.SERVICE.getName(), "1");
procNodeA.setProcessGroup(mockProcessGroup);
-
+
final String id2 = UUID.randomUUID().toString();
- final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(),id2,
+ final ProcessorNode procNodeB = new StandardProcessorNode(new DummyProcessor(), id2,
new StandardValidationContextFactory(provider), scheduler, provider);
procNodeB.getProcessor().initialize(new StandardProcessorInitializationContext(id2, null, provider));
procNodeB.setProperty(DummyProcessor.SERVICE.getName(), "3");
procNodeB.setProcessGroup(mockProcessGroup);
-
+
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
-
+
provider.enableControllerService(serviceNode4);
provider.enableReferencingServices(serviceNode4);
provider.scheduleReferencingComponents(serviceNode4);
-
+
assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
assertTrue(procNodeA.isRunning());
assertTrue(procNodeB.isRunning());
-
+
// stop processors and verify results.
provider.unscheduleReferencingComponents(serviceNode4);
assertFalse(procNodeA.isRunning());
@@ -225,18 +221,17 @@ public class TestStandardControllerServiceProvider {
assertEquals(ControllerServiceState.ENABLED, serviceNode3.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode2.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode1.getState());
-
+
provider.disableReferencingServices(serviceNode4);
assertEquals(ControllerServiceState.DISABLED, serviceNode3.getState());
assertEquals(ControllerServiceState.DISABLED, serviceNode2.getState());
assertEquals(ControllerServiceState.DISABLED, serviceNode1.getState());
assertEquals(ControllerServiceState.ENABLED, serviceNode4.getState());
-
+
provider.disableControllerService(serviceNode4);
assertEquals(ControllerServiceState.DISABLED, serviceNode4.getState());
}
-
-
+
@Test
public void testOrderingOfServices() {
final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(null, null);
@@ -248,7 +243,7 @@ public class TestStandardControllerServiceProvider {
final Map<String, ControllerServiceNode> nodeMap = new LinkedHashMap<>();
nodeMap.put("1", serviceNode1);
nodeMap.put("2", serviceNode2);
-
+
List<List<ControllerServiceNode>> branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
assertEquals(2, branches.size());
List<ControllerServiceNode> ordered = branches.get(0);
@@ -257,11 +252,11 @@ public class TestStandardControllerServiceProvider {
assertTrue(ordered.get(1) == serviceNode1);
assertEquals(1, branches.get(1).size());
assertTrue(branches.get(1).get(0) == serviceNode2);
-
+
nodeMap.clear();
nodeMap.put("2", serviceNode2);
nodeMap.put("1", serviceNode1);
-
+
branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
assertEquals(2, branches.size());
ordered = branches.get(1);
@@ -270,20 +265,20 @@ public class TestStandardControllerServiceProvider {
assertTrue(ordered.get(1) == serviceNode1);
assertEquals(1, branches.get(0).size());
assertTrue(branches.get(0).get(0) == serviceNode2);
-
+
// add circular dependency on self.
nodeMap.clear();
serviceNode1.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "1");
nodeMap.put("1", serviceNode1);
nodeMap.put("2", serviceNode2);
-
+
branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
assertEquals(2, branches.size());
ordered = branches.get(0);
assertEquals(2, ordered.size());
assertTrue(ordered.get(0) == serviceNode2);
assertTrue(ordered.get(1) == serviceNode1);
-
+
nodeMap.clear();
nodeMap.put("2", serviceNode2);
nodeMap.put("1", serviceNode1);
@@ -293,7 +288,7 @@ public class TestStandardControllerServiceProvider {
assertEquals(2, ordered.size());
assertTrue(ordered.get(0) == serviceNode2);
assertTrue(ordered.get(1) == serviceNode1);
-
+
// add circular dependency once removed. In this case, we won't actually be able to enable these because of the
// circular dependency because they will never be valid because they will always depend on a disabled service.
// But we want to ensure that the method returns successfully without throwing a StackOverflowException or anything
@@ -310,7 +305,7 @@ public class TestStandardControllerServiceProvider {
assertEquals(2, ordered.size());
assertTrue(ordered.get(0) == serviceNode3);
assertTrue(ordered.get(1) == serviceNode1);
-
+
nodeMap.clear();
nodeMap.put("3", serviceNode3);
nodeMap.put("1", serviceNode1);
@@ -320,8 +315,7 @@ public class TestStandardControllerServiceProvider {
assertEquals(2, ordered.size());
assertTrue(ordered.get(0) == serviceNode3);
assertTrue(ordered.get(1) == serviceNode1);
-
-
+
// Add multiple completely disparate branches.
nodeMap.clear();
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
@@ -333,7 +327,7 @@ public class TestStandardControllerServiceProvider {
nodeMap.put("3", serviceNode3);
nodeMap.put("4", serviceNode4);
nodeMap.put("5", serviceNode5);
-
+
branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
assertEquals(5, branches.size());
@@ -341,21 +335,21 @@ public class TestStandardControllerServiceProvider {
assertEquals(2, ordered.size());
assertTrue(ordered.get(0) == serviceNode2);
assertTrue(ordered.get(1) == serviceNode1);
-
+
assertEquals(1, branches.get(1).size());
assertTrue(branches.get(1).get(0) == serviceNode2);
-
+
ordered = branches.get(2);
assertEquals(2, ordered.size());
assertTrue(ordered.get(0) == serviceNode4);
assertTrue(ordered.get(1) == serviceNode3);
-
+
assertEquals(1, branches.get(3).size());
assertTrue(branches.get(3).get(0) == serviceNode4);
-
+
assertEquals(1, branches.get(4).size());
assertTrue(branches.get(4).get(0) == serviceNode5);
-
+
// create 2 branches both dependent on the same service
nodeMap.clear();
serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
@@ -363,19 +357,19 @@ public class TestStandardControllerServiceProvider {
nodeMap.put("1", serviceNode1);
nodeMap.put("2", serviceNode2);
nodeMap.put("3", serviceNode3);
-
+
branches = StandardControllerServiceProvider.determineEnablingOrder(nodeMap);
assertEquals(3, branches.size());
-
+
ordered = branches.get(0);
assertEquals(2, ordered.size());
assertTrue(ordered.get(0) == serviceNode2);
assertTrue(ordered.get(1) == serviceNode1);
-
+
ordered = branches.get(1);
assertEquals(1, ordered.size());
assertTrue(ordered.get(0) == serviceNode2);
-
+
ordered = branches.get(2);
assertEquals(2, ordered.size());
assertTrue(ordered.get(0) == serviceNode2);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
index 615e172..13898a5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/DummyProcessor.java
@@ -29,19 +29,18 @@ import org.apache.nifi.processor.exception.ProcessException;
public class DummyProcessor extends AbstractProcessor {
public static final PropertyDescriptor SERVICE = new PropertyDescriptor.Builder()
- .name("Controller Service")
- .identifiesControllerService(ControllerService.class)
- .required(true)
- .build();
-
-
+ .name("Controller Service")
+ .identifiesControllerService(ControllerService.class)
+ .required(true)
+ .build();
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(SERVICE);
return descriptors;
}
-
+
@Override
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
index 4918468..f93184b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
@@ -26,18 +26,17 @@ import org.apache.nifi.controller.ControllerService;
public class ServiceA extends AbstractControllerService {
public static final PropertyDescriptor OTHER_SERVICE = new PropertyDescriptor.Builder()
- .name("Other Service")
- .identifiesControllerService(ControllerService.class)
- .required(true)
- .build();
-
+ .name("Other Service")
+ .identifiesControllerService(ControllerService.class)
+ .required(true)
+ .build();
+
public static final PropertyDescriptor OTHER_SERVICE_2 = new PropertyDescriptor.Builder()
- .name("Other Service 2")
- .identifiesControllerService(ControllerService.class)
- .required(false)
- .build();
+ .name("Other Service 2")
+ .identifiesControllerService(ControllerService.class)
+ .required(false)
+ .build();
-
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -45,5 +44,5 @@ public class ServiceA extends AbstractControllerService {
descriptors.add(OTHER_SERVICE_2);
return descriptors;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java
index 95200a0..65ef13f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/util/TestControllerService.java
@@ -28,34 +28,34 @@ import org.apache.nifi.reporting.InitializationException;
public class TestControllerService implements ControllerService {
- @Override
- public Collection<ValidationResult> validate(ValidationContext context) {
- return null;
- }
-
- @Override
- public PropertyDescriptor getPropertyDescriptor(String name) {
- return null;
- }
-
- @Override
- public void onPropertyModified(PropertyDescriptor descriptor,
- String oldValue, String newValue) {
- }
-
- @Override
- public List<PropertyDescriptor> getPropertyDescriptors() {
- return null;
- }
-
- @Override
- public String getIdentifier() {
- return null;
- }
-
- @Override
- public void initialize(ControllerServiceInitializationContext context)
- throws InitializationException {
- }
+ @Override
+ public Collection<ValidationResult> validate(ValidationContext context) {
+ return null;
+ }
+
+ @Override
+ public PropertyDescriptor getPropertyDescriptor(String name) {
+ return null;
+ }
+
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor,
+ String oldValue, String newValue) {
+ }
+
+ @Override
+ public List<PropertyDescriptor> getPropertyDescriptors() {
+ return null;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return null;
+ }
+
+ @Override
+ public void initialize(ControllerServiceInitializationContext context)
+ throws InitializationException {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
index a0bf30d..be40e90 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/processor/TestStandardPropertyValue.java
@@ -165,9 +165,9 @@ public class TestStandardPropertyValue {
@Override
public String getControllerServiceName(String serviceIdentifier) {
- return null;
+ return null;
}
-
+
@Override
public boolean isControllerServiceEnabling(String serviceIdentifier) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml
index 46a1aca..0406ed6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/pom.xml
@@ -32,7 +32,7 @@
<artifactId>nifi-properties</artifactId>
<scope>compile</scope>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-documentation</artifactId>
<scope>compile</scope>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
index 590797c..c1bdf97 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/BootstrapListener.java
@@ -47,166 +47,165 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BootstrapListener {
- private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
-
- private final NiFi nifi;
- private final int bootstrapPort;
- private final String secretKey;
-
- private volatile Listener listener;
- private volatile ServerSocket serverSocket;
-
-
- public BootstrapListener(final NiFi nifi, final int bootstrapPort) {
- this.nifi = nifi;
- this.bootstrapPort = bootstrapPort;
- secretKey = UUID.randomUUID().toString();
- }
-
- public void start() throws IOException {
- logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort);
-
- serverSocket = new ServerSocket();
- serverSocket.bind(new InetSocketAddress("localhost", 0));
- serverSocket.setSoTimeout(2000);
-
- final int localPort = serverSocket.getLocalPort();
- logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort);
-
- listener = new Listener(serverSocket);
- final Thread listenThread = new Thread(listener);
- listenThread.setDaemon(true);
- listenThread.setName("Listen to Bootstrap");
- listenThread.start();
-
- logger.debug("Notifying Bootstrap that local port is {}", localPort);
- try (final Socket socket = new Socket()) {
- socket.setSoTimeout(60000);
- socket.connect(new InetSocketAddress("localhost", bootstrapPort));
- socket.setSoTimeout(60000);
-
- final OutputStream out = socket.getOutputStream();
- out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
- out.flush();
-
- logger.debug("Awaiting response from Bootstrap...");
- final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- final String response = reader.readLine();
- if ("OK".equals(response)) {
- logger.info("Successfully initiated communication with Bootstrap");
- } else {
- logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi");
- }
- }
- }
-
-
- public void stop() {
- if (listener != null) {
- listener.stop();
- }
- }
-
- private class Listener implements Runnable {
- private final ServerSocket serverSocket;
- private final ExecutorService executor;
- private volatile boolean stopped = false;
-
- public Listener(final ServerSocket serverSocket) {
- this.serverSocket = serverSocket;
- this.executor = Executors.newFixedThreadPool(2);
- }
-
- public void stop() {
- stopped = true;
-
- executor.shutdownNow();
-
- try {
- serverSocket.close();
- } catch (final IOException ioe) {
- // nothing to really do here. we could log this, but it would just become
- // confusing in the logs, as we're shutting down and there's no real benefit
- }
- }
-
- @Override
- public void run() {
- while (!stopped) {
- try {
- final Socket socket;
- try {
- logger.debug("Listening for Bootstrap Requests");
- socket = serverSocket.accept();
- } catch (final SocketTimeoutException ste) {
- if ( stopped ) {
- return;
- }
-
- continue;
- } catch (final IOException ioe) {
- if ( stopped ) {
- return;
- }
-
- throw ioe;
- }
-
- logger.debug("Received connection from Bootstrap");
- socket.setSoTimeout(5000);
-
- executor.submit(new Runnable() {
- @Override
- public void run() {
- try {
- final BootstrapRequest request = readRequest(socket.getInputStream());
- final BootstrapRequest.RequestType requestType = request.getRequestType();
-
- switch (requestType) {
- case PING:
- logger.debug("Received PING request from Bootstrap; responding");
- echoPing(socket.getOutputStream());
- logger.debug("Responded to PING request from Bootstrap");
- break;
- case SHUTDOWN:
- logger.info("Received SHUTDOWN request from Bootstrap");
- echoShutdown(socket.getOutputStream());
- nifi.shutdownHook();
- return;
- case DUMP:
- logger.info("Received DUMP request from Bootstrap");
- writeDump(socket.getOutputStream());
- break;
- }
- } catch (final Throwable t) {
- logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
- } finally {
- try {
- socket.close();
- } catch (final IOException ioe) {
- logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString());
- }
- }
- }
- });
- } catch (final Throwable t) {
- logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
- }
- }
- }
- }
-
-
- private static void writeDump(final OutputStream out) throws IOException {
+
+ private static final Logger logger = LoggerFactory.getLogger(BootstrapListener.class);
+
+ private final NiFi nifi;
+ private final int bootstrapPort;
+ private final String secretKey;
+
+ private volatile Listener listener;
+ private volatile ServerSocket serverSocket;
+
+ public BootstrapListener(final NiFi nifi, final int bootstrapPort) {
+ this.nifi = nifi;
+ this.bootstrapPort = bootstrapPort;
+ secretKey = UUID.randomUUID().toString();
+ }
+
+ public void start() throws IOException {
+ logger.debug("Starting Bootstrap Listener to communicate with Bootstrap Port {}", bootstrapPort);
+
+ serverSocket = new ServerSocket();
+ serverSocket.bind(new InetSocketAddress("localhost", 0));
+ serverSocket.setSoTimeout(2000);
+
+ final int localPort = serverSocket.getLocalPort();
+ logger.info("Started Bootstrap Listener, Listening for incoming requests on port {}", localPort);
+
+ listener = new Listener(serverSocket);
+ final Thread listenThread = new Thread(listener);
+ listenThread.setDaemon(true);
+ listenThread.setName("Listen to Bootstrap");
+ listenThread.start();
+
+ logger.debug("Notifying Bootstrap that local port is {}", localPort);
+ try (final Socket socket = new Socket()) {
+ socket.setSoTimeout(60000);
+ socket.connect(new InetSocketAddress("localhost", bootstrapPort));
+ socket.setSoTimeout(60000);
+
+ final OutputStream out = socket.getOutputStream();
+ out.write(("PORT " + localPort + " " + secretKey + "\n").getBytes(StandardCharsets.UTF_8));
+ out.flush();
+
+ logger.debug("Awaiting response from Bootstrap...");
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ final String response = reader.readLine();
+ if ("OK".equals(response)) {
+ logger.info("Successfully initiated communication with Bootstrap");
+ } else {
+ logger.error("Failed to communicate with Bootstrap. Bootstrap may be unable to issue or receive commands from NiFi");
+ }
+ }
+ }
+
+ public void stop() {
+ if (listener != null) {
+ listener.stop();
+ }
+ }
+
+ private class Listener implements Runnable {
+
+ private final ServerSocket serverSocket;
+ private final ExecutorService executor;
+ private volatile boolean stopped = false;
+
+ public Listener(final ServerSocket serverSocket) {
+ this.serverSocket = serverSocket;
+ this.executor = Executors.newFixedThreadPool(2);
+ }
+
+ public void stop() {
+ stopped = true;
+
+ executor.shutdownNow();
+
+ try {
+ serverSocket.close();
+ } catch (final IOException ioe) {
+ // nothing to really do here. we could log this, but it would just become
+ // confusing in the logs, as we're shutting down and there's no real benefit
+ }
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ try {
+ final Socket socket;
+ try {
+ logger.debug("Listening for Bootstrap Requests");
+ socket = serverSocket.accept();
+ } catch (final SocketTimeoutException ste) {
+ if (stopped) {
+ return;
+ }
+
+ continue;
+ } catch (final IOException ioe) {
+ if (stopped) {
+ return;
+ }
+
+ throw ioe;
+ }
+
+ logger.debug("Received connection from Bootstrap");
+ socket.setSoTimeout(5000);
+
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ final BootstrapRequest request = readRequest(socket.getInputStream());
+ final BootstrapRequest.RequestType requestType = request.getRequestType();
+
+ switch (requestType) {
+ case PING:
+ logger.debug("Received PING request from Bootstrap; responding");
+ echoPing(socket.getOutputStream());
+ logger.debug("Responded to PING request from Bootstrap");
+ break;
+ case SHUTDOWN:
+ logger.info("Received SHUTDOWN request from Bootstrap");
+ echoShutdown(socket.getOutputStream());
+ nifi.shutdownHook();
+ return;
+ case DUMP:
+ logger.info("Received DUMP request from Bootstrap");
+ writeDump(socket.getOutputStream());
+ break;
+ }
+ } catch (final Throwable t) {
+ logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
+ } finally {
+ try {
+ socket.close();
+ } catch (final IOException ioe) {
+ logger.warn("Failed to close socket to Bootstrap due to {}", ioe.toString());
+ }
+ }
+ }
+ });
+ } catch (final Throwable t) {
+ logger.error("Failed to process request from Bootstrap due to " + t.toString(), t);
+ }
+ }
+ }
+ }
+
+ private static void writeDump(final OutputStream out) throws IOException {
final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
-
+
final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();
-
+
final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
- for ( final ThreadInfo info : infos ) {
+ for (final ThreadInfo info : infos) {
sortedInfos.add(info);
}
Collections.sort(sortedInfos, new Comparator<ThreadInfo>() {
@@ -215,14 +214,14 @@ public class BootstrapListener {
return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
}
});
-
+
final StringBuilder sb = new StringBuilder();
- for ( final ThreadInfo info : sortedInfos ) {
+ for (final ThreadInfo info : sortedInfos) {
sb.append("\n");
sb.append("\"").append(info.getThreadName()).append("\" Id=");
sb.append(info.getThreadId()).append(" ");
sb.append(info.getThreadState().toString()).append(" ");
-
+
switch (info.getThreadState()) {
case BLOCKED:
case TIMED_WAITING:
@@ -233,66 +232,66 @@ public class BootstrapListener {
default:
break;
}
-
+
if (info.isSuspended()) {
sb.append(" (suspended)");
}
- if ( info.isInNative() ) {
+ if (info.isInNative()) {
sb.append(" (in native code)");
}
-
- if ( deadlockedThreadIds != null && deadlockedThreadIds.length > 0 ) {
- for ( final long id : deadlockedThreadIds ) {
- if ( id == info.getThreadId() ) {
+
+ if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
+ for (final long id : deadlockedThreadIds) {
+ if (id == info.getThreadId()) {
sb.append(" ** DEADLOCKED THREAD **");
}
}
}
- if ( monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0 ) {
- for ( final long id : monitorDeadlockThreadIds ) {
- if ( id == info.getThreadId() ) {
+ if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
+ for (final long id : monitorDeadlockThreadIds) {
+ if (id == info.getThreadId()) {
sb.append(" ** MONITOR-DEADLOCKED THREAD **");
}
}
}
final StackTraceElement[] stackTraces = info.getStackTrace();
- for ( final StackTraceElement element : stackTraces ) {
+ for (final StackTraceElement element : stackTraces) {
sb.append("\n\tat ").append(element);
-
+
final MonitorInfo[] monitors = info.getLockedMonitors();
- for ( final MonitorInfo monitor : monitors ) {
- if ( monitor.getLockedStackFrame().equals(element) ) {
+ for (final MonitorInfo monitor : monitors) {
+ if (monitor.getLockedStackFrame().equals(element)) {
sb.append("\n\t- waiting on ").append(monitor);
}
}
}
-
+
final LockInfo[] lockInfos = info.getLockedSynchronizers();
- if ( lockInfos.length > 0 ) {
+ if (lockInfos.length > 0) {
sb.append("\n\t");
sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
- for ( final LockInfo lockInfo : lockInfos ) {
+ for (final LockInfo lockInfo : lockInfos) {
sb.append("\n\t- ").append(lockInfo.toString());
}
}
-
+
sb.append("\n");
}
-
+
if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
sb.append("\n\nDEADLOCK DETECTED!");
sb.append("\nThe following thread IDs are deadlocked:");
- for ( final long id : deadlockedThreadIds ) {
+ for (final long id : deadlockedThreadIds) {
sb.append("\n").append(id);
}
}
- if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
+ if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
sb.append("\n\nMONITOR DEADLOCK DETECTED!");
sb.append("\nThe following thread IDs are deadlocked:");
- for ( final long id : monitorDeadlockThreadIds ) {
+ for (final long id : monitorDeadlockThreadIds) {
sb.append("\n").append(id);
}
}
@@ -300,79 +299,79 @@ public class BootstrapListener {
writer.write(sb.toString());
writer.flush();
}
-
- private void echoPing(final OutputStream out) throws IOException {
- out.write("PING\n".getBytes(StandardCharsets.UTF_8));
- out.flush();
- }
-
- private void echoShutdown(final OutputStream out) throws IOException {
- out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
- out.flush();
- }
-
-
- @SuppressWarnings("resource") // we don't want to close the stream, as the caller will do that
+
+ private void echoPing(final OutputStream out) throws IOException {
+ out.write("PING\n".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ }
+
+ private void echoShutdown(final OutputStream out) throws IOException {
+ out.write("SHUTDOWN\n".getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ }
+
+ @SuppressWarnings("resource") // we don't want to close the stream, as the caller will do that
private BootstrapRequest readRequest(final InputStream in) throws IOException {
- // We want to ensure that we don't try to read data from an InputStream directly
- // by a BufferedReader because any user on the system could open a socket and send
- // a multi-gigabyte file without any new lines in order to crash the NiFi instance
- // (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance).
- // So we will limit the Input Stream to only 4 KB, which should be plenty for any request.
- final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096);
- final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn));
-
- final String line = reader.readLine();
- final String[] splits = line.split(" ");
- if ( splits.length < 1 ) {
- throw new IOException("Received invalid request from Bootstrap: " + line);
- }
-
- final String requestType = splits[0];
- final String[] args;
- if ( splits.length == 1 ) {
- throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType);
- } else if ( splits.length == 2 ) {
- args = new String[0];
- } else {
- args = Arrays.copyOfRange(splits, 2, splits.length);
- }
-
- final String requestKey = splits[1];
- if ( !secretKey.equals(requestKey) ) {
- throw new IOException("Received invalid Secret Key for request type " + requestType);
- }
-
- try {
- return new BootstrapRequest(requestType, args);
- } catch (final Exception e) {
- throw new IOException("Received invalid request from Bootstrap; request type = " + requestType);
- }
- }
-
-
- private static class BootstrapRequest {
- public static enum RequestType {
- SHUTDOWN,
- DUMP,
- PING;
- }
-
- private final RequestType requestType;
- private final String[] args;
-
- public BootstrapRequest(final String request, final String[] args) {
- this.requestType = RequestType.valueOf(request);
- this.args = args;
- }
-
- public RequestType getRequestType() {
- return requestType;
- }
-
- @SuppressWarnings("unused")
+ // We want to ensure that we don't try to read data from an InputStream directly
+ // by a BufferedReader because any user on the system could open a socket and send
+ // a multi-gigabyte file without any new lines in order to crash the NiFi instance
+ // (or at least cause OutOfMemoryErrors, which can wreak havoc on the running instance).
+ // So we will limit the Input Stream to only 4 KB, which should be plenty for any request.
+ final LimitingInputStream limitingIn = new LimitingInputStream(in, 4096);
+ final BufferedReader reader = new BufferedReader(new InputStreamReader(limitingIn));
+
+ final String line = reader.readLine();
+ final String[] splits = line.split(" ");
+ if (splits.length < 1) {
+ throw new IOException("Received invalid request from Bootstrap: " + line);
+ }
+
+ final String requestType = splits[0];
+ final String[] args;
+ if (splits.length == 1) {
+ throw new IOException("Received invalid request from Bootstrap; request did not have a secret key; request type = " + requestType);
+ } else if (splits.length == 2) {
+ args = new String[0];
+ } else {
+ args = Arrays.copyOfRange(splits, 2, splits.length);
+ }
+
+ final String requestKey = splits[1];
+ if (!secretKey.equals(requestKey)) {
+ throw new IOException("Received invalid Secret Key for request type " + requestType);
+ }
+
+ try {
+ return new BootstrapRequest(requestType, args);
+ } catch (final Exception e) {
+ throw new IOException("Received invalid request from Bootstrap; request type = " + requestType);
+ }
+ }
+
+ private static class BootstrapRequest {
+
+ public static enum RequestType {
+
+ SHUTDOWN,
+ DUMP,
+ PING;
+ }
+
+ private final RequestType requestType;
+ private final String[] args;
+
+ public BootstrapRequest(final String request, final String[] args) {
+ this.requestType = RequestType.valueOf(request);
+ this.args = args;
+ }
+
+ public RequestType getRequestType() {
+ return requestType;
+ }
+
+ @SuppressWarnings("unused")
public String[] getArgs() {
- return args;
- }
- }
+ return args;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
index e166f8e..ef2377f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/NiFi.java
@@ -47,11 +47,12 @@ public class NiFi {
private static final Logger logger = LoggerFactory.getLogger(NiFi.class);
private final NiFiServer nifiServer;
private final BootstrapListener bootstrapListener;
-
+
public static final String BOOTSTRAP_PORT_PROPERTY = "nifi.bootstrap.listen.port";
private volatile boolean shutdown = false;
- public NiFi(final NiFiProperties properties) throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
+ public NiFi(final NiFiProperties properties)
+ throws ClassNotFoundException, IOException, NoSuchMethodException, InstantiationException, IllegalAccessException, IllegalArgumentException, InvocationTargetException {
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
@@ -70,24 +71,24 @@ public class NiFi {
}));
final String bootstrapPort = System.getProperty(BOOTSTRAP_PORT_PROPERTY);
- if ( bootstrapPort != null ) {
- try {
- final int port = Integer.parseInt(bootstrapPort);
-
- if (port < 1 || port > 65535) {
- throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
- }
-
- bootstrapListener = new BootstrapListener(this, port);
- bootstrapListener.start();
- } catch (final NumberFormatException nfe) {
- throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
- }
+ if (bootstrapPort != null) {
+ try {
+ final int port = Integer.parseInt(bootstrapPort);
+
+ if (port < 1 || port > 65535) {
+ throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
+ }
+
+ bootstrapListener = new BootstrapListener(this, port);
+ bootstrapListener.start();
+ } catch (final NumberFormatException nfe) {
+ throw new RuntimeException("Failed to start NiFi because system property '" + BOOTSTRAP_PORT_PROPERTY + "' is not a valid integer in the range 1 - 65535");
+ }
} else {
- logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
- bootstrapListener = null;
+ logger.info("NiFi started without Bootstrap Port information provided; will not listen for requests from Bootstrap");
+ bootstrapListener = null;
}
-
+
// delete the web working dir - if the application does not start successfully
// the web app directories might be in an invalid state. when this happens
// jetty will not attempt to re-extract the war into the directory. by removing
@@ -118,7 +119,7 @@ public class NiFi {
// discover the extensions
ExtensionManager.discoverExtensions();
ExtensionManager.logClassLoaderMapping();
-
+
DocGenerator.generate(properties);
// load the server from the framework classloader
@@ -129,27 +130,27 @@ public class NiFi {
final long startTime = System.nanoTime();
nifiServer = (NiFiServer) jettyConstructor.newInstance(properties);
nifiServer.setExtensionMapping(extensionMapping);
-
- if ( shutdown ) {
- logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
+
+ if (shutdown) {
+ logger.info("NiFi has been shutdown via NiFi Bootstrap. Will not start Controller");
} else {
- nifiServer.start();
-
- final long endTime = System.nanoTime();
- logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds.");
+ nifiServer.start();
+
+ final long endTime = System.nanoTime();
+ logger.info("Controller initialization took " + (endTime - startTime) + " nanoseconds.");
}
}
protected void shutdownHook() {
try {
- this.shutdown = true;
-
+ this.shutdown = true;
+
logger.info("Initiating shutdown of Jetty web server...");
if (nifiServer != null) {
nifiServer.stop();
}
if (bootstrapListener != null) {
- bootstrapListener.stop();
+ bootstrapListener.stop();
}
logger.info("Jetty web server shutdown completed (nicely or otherwise).");
} catch (final Throwable t) {
@@ -164,10 +165,10 @@ public class NiFi {
final int minRequiredOccurrences = 25;
final int maxOccurrencesOutOfRange = 15;
final AtomicLong lastTriggerMillis = new AtomicLong(System.currentTimeMillis());
-
+
final ScheduledExecutorService service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
-
+
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultFactory.newThread(r);
@@ -176,7 +177,7 @@ public class NiFi {
return t;
}
});
-
+
final AtomicInteger occurrencesOutOfRange = new AtomicInteger(0);
final AtomicInteger occurences = new AtomicInteger(0);
final Runnable command = new Runnable() {
@@ -202,7 +203,8 @@ public class NiFi {
service.shutdownNow();
if (occurences.get() < minRequiredOccurrences || occurrencesOutOfRange.get() > maxOccurrencesOutOfRange) {
- logger.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
+ logger.warn("NiFi has detected that this box is not responding within the expected timing interval, which may cause "
+ + "Processors to be scheduled erratically. Please see the NiFi documentation for more information.");
}
}
};
@@ -213,7 +215,7 @@ public class NiFi {
/**
* Main entry point of the application.
*
- * @param args
+ * @param args things which are ignored
*/
public static void main(String[] args) {
logger.info("Launching NiFi...");