You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:13:27 UTC
[37/49] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
index 59837c1..ab10b01 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterManagerProtocolSenderImplTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.cluster.protocol.impl;
-import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
-import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderImpl;
import java.io.IOException;
import java.net.InetAddress;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -33,11 +31,13 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.ServerSocketConfiguration;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.junit.After;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -45,48 +45,48 @@ import org.mockito.stubbing.Answer;
* @author unattributed
*/
public class ClusterManagerProtocolSenderImplTest {
-
+
private InetAddress address;
-
+
private int port;
-
+
private SocketProtocolListener listener;
-
+
private ClusterManagerProtocolSenderImpl sender;
-
+
private ProtocolHandler mockHandler;
-
+
@Before
public void setup() throws IOException {
-
+
address = InetAddress.getLocalHost();
ServerSocketConfiguration serverSocketConfiguration = new ServerSocketConfiguration();
serverSocketConfiguration.setSocketTimeout(2000);
mockHandler = mock(ProtocolHandler.class);
-
+
ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
+
listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext);
listener.addHandler(mockHandler);
listener.start();
-
+
port = listener.getPort();
-
+
SocketConfiguration socketConfiguration = new SocketConfiguration();
sender = new ClusterManagerProtocolSenderImpl(socketConfiguration, protocolContext);
}
-
+
@After
public void teardown() throws IOException {
- if(listener.isRunning()) {
+ if (listener.isRunning()) {
listener.stop();
}
}
-
+
@Test
public void testRequestFlow() throws Exception {
-
+
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new FlowResponseMessage());
FlowRequestMessage request = new FlowRequestMessage();
@@ -94,10 +94,10 @@ public class ClusterManagerProtocolSenderImplTest {
FlowResponseMessage response = sender.requestFlow(request);
assertNotNull(response);
}
-
+
@Test
public void testRequestFlowWithBadResponseMessage() throws Exception {
-
+
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
FlowRequestMessage request = new FlowRequestMessage();
@@ -105,16 +105,17 @@ public class ClusterManagerProtocolSenderImplTest {
try {
sender.requestFlow(request);
fail("failed to throw exception");
- } catch(ProtocolException pe) {}
-
+ } catch (ProtocolException pe) {
+ }
+
}
-
+
@Test
public void testRequestFlowDelayedResponse() throws Exception {
-
+
final int time = 250;
sender.getSocketConfiguration().setSocketTimeout(time);
-
+
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenAnswer(new Answer<FlowResponseMessage>() {
@Override
@@ -128,7 +129,8 @@ public class ClusterManagerProtocolSenderImplTest {
try {
sender.requestFlow(request);
fail("failed to throw exception");
- } catch(ProtocolException pe) {}
+ } catch (ProtocolException pe) {
+ }
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java
index e3703e2..8df6dcb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceDiscoveryTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.nifi.cluster.protocol.impl;
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
@@ -32,7 +31,8 @@ import org.apache.nifi.cluster.protocol.message.ServiceBroadcastMessage;
import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
import org.apache.nifi.io.socket.multicast.MulticastUtils;
import org.junit.After;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -41,60 +41,60 @@ import org.junit.Test;
* @author unattributed
*/
public class ClusterServiceDiscoveryTest {
-
+
private ClusterServiceDiscovery discovery;
-
+
private String serviceName;
-
+
private MulticastSocket socket;
-
+
private InetSocketAddress multicastAddress;
-
+
private MulticastConfiguration configuration;
-
+
private ProtocolContext protocolContext;
-
+
@Before
public void setup() throws Exception {
serviceName = "some-service";
multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
configuration = new MulticastConfiguration();
-
+
protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
+
discovery = new ClusterServiceDiscovery(serviceName, multicastAddress, configuration, protocolContext);
discovery.start();
socket = MulticastUtils.createMulticastSocket(multicastAddress.getPort(), configuration);
}
-
+
@After
public void teardown() throws IOException {
try {
- if(discovery.isRunning()) {
+ if (discovery.isRunning()) {
discovery.stop();
}
} finally {
MulticastUtils.closeQuietly(socket);
}
}
-
+
@Ignore("Test needs to be fixed. Requires an active network connection")
@Test
public void testGetAddressOnStartup() {
assertNull(discovery.getService());
- }
-
+ }
+
@Ignore("This test has an NPE after ignoring another...perhaps has a bad inter-test dependency")
@Test
public void testGetAddressAfterBroadcast() throws Exception {
-
+
ServiceBroadcastMessage msg = new ServiceBroadcastMessage();
msg.setServiceName("some-service");
msg.setAddress("3.3.3.3");
msg.setPort(1234);
-
+
// marshal message to output stream
ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -102,22 +102,22 @@ public class ClusterServiceDiscoveryTest {
byte[] requestPacketBytes = baos.toByteArray();
DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, multicastAddress);
socket.send(packet);
-
+
Thread.sleep(250);
-
+
InetSocketAddress updatedAddress = discovery.getService().getServiceAddress();
assertEquals("some-service", discovery.getServiceName());
assertEquals("3.3.3.3", updatedAddress.getHostName());
assertEquals(1234, updatedAddress.getPort());
-
+
}
-
+
@Ignore("Test needs to be fixed. Requires an active network connection")
@Test
public void testBadBroadcastMessage() throws Exception {
-
+
ProtocolMessage msg = new PingMessage();
-
+
// marshal message to output stream
ProtocolMessageMarshaller marshaller = protocolContext.createMarshaller();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -125,11 +125,11 @@ public class ClusterServiceDiscoveryTest {
byte[] requestPacketBytes = baos.toByteArray();
DatagramPacket packet = new DatagramPacket(requestPacketBytes, requestPacketBytes.length, multicastAddress);
socket.send(packet);
-
+
Thread.sleep(250);
-
+
assertNull(discovery.getService());
-
+
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
index b1c156b..ea40150 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServiceLocatorTest.java
@@ -16,104 +16,104 @@
*/
package org.apache.nifi.cluster.protocol.impl;
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceDiscovery;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import org.mockito.stubbing.OngoingStubbing;
public class ClusterServiceLocatorTest {
-
+
private ClusterServiceDiscovery mockServiceDiscovery;
-
+
private int fixedPort;
-
+
private DiscoverableService fixedService;
-
+
private ClusterServiceLocator serviceDiscoveryLocator;
-
+
private ClusterServiceLocator serviceDiscoveryFixedPortLocator;
-
+
private ClusterServiceLocator fixedServiceLocator;
-
+
@Before
public void setup() throws Exception {
-
+
fixedPort = 1;
mockServiceDiscovery = mock(ClusterServiceDiscovery.class);
fixedService = new DiscoverableServiceImpl("some-service", InetSocketAddress.createUnresolved("some-host", 20));
-
+
serviceDiscoveryLocator = new ClusterServiceLocator(mockServiceDiscovery);
serviceDiscoveryFixedPortLocator = new ClusterServiceLocator(mockServiceDiscovery, fixedPort);
fixedServiceLocator = new ClusterServiceLocator(fixedService);
-
+
}
-
+
@Test
public void getServiceWhenServiceDiscoveryNotStarted() {
assertNull(serviceDiscoveryLocator.getService());
}
-
+
@Test
public void getServiceWhenServiceDiscoveryFixedPortNotStarted() {
assertNull(serviceDiscoveryLocator.getService());
}
-
+
@Test
public void getServiceWhenFixedServiceNotStarted() {
assertEquals(fixedService, fixedServiceLocator.getService());
}
-
+
@Test
public void getServiceNotOnFirstAttempt() {
-
+
ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
config.setNumAttempts(2);
config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
config.setTimeBetweenAttempts(1);
-
+
serviceDiscoveryLocator.setAttemptsConfig(config);
-
+
OngoingStubbing<DiscoverableService> stubbing = null;
- for(int i = 0; i < config.getNumAttempts() - 1; i++) {
- if(stubbing == null) {
+ for (int i = 0; i < config.getNumAttempts() - 1; i++) {
+ if (stubbing == null) {
stubbing = when(mockServiceDiscovery.getService()).thenReturn(null);
} else {
stubbing.thenReturn(null);
}
}
stubbing.thenReturn(fixedService);
-
+
assertEquals(fixedService, serviceDiscoveryLocator.getService());
-
+
}
-
+
@Test
public void getServiceNotOnFirstAttemptWithFixedPort() {
-
+
ClusterServiceLocator.AttemptsConfig config = new ClusterServiceLocator.AttemptsConfig();
config.setNumAttempts(2);
config.setTimeBetweenAttempsUnit(TimeUnit.SECONDS);
config.setTimeBetweenAttempts(1);
-
+
serviceDiscoveryFixedPortLocator.setAttemptsConfig(config);
-
+
OngoingStubbing<DiscoverableService> stubbing = null;
- for(int i = 0; i < config.getNumAttempts() - 1; i++) {
- if(stubbing == null) {
+ for (int i = 0; i < config.getNumAttempts() - 1; i++) {
+ if (stubbing == null) {
stubbing = when(mockServiceDiscovery.getService()).thenReturn(null);
} else {
stubbing.thenReturn(null);
}
}
stubbing.thenReturn(fixedService);
-
+
InetSocketAddress resultAddress = InetSocketAddress.createUnresolved(fixedService.getServiceAddress().getHostName(), fixedPort);
DiscoverableService resultService = new DiscoverableServiceImpl(fixedService.getServiceName(), resultAddress);
assertEquals(resultService, serviceDiscoveryFixedPortLocator.getService());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
index ec1f26d..5e98397 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/ClusterServicesBroadcasterTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.nifi.cluster.protocol.impl;
-import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
-import org.apache.nifi.cluster.protocol.impl.MulticastProtocolListener;
import java.net.InetSocketAddress;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
@@ -30,7 +28,8 @@ import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.io.socket.multicast.DiscoverableServiceImpl;
import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
import org.junit.After;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@@ -39,68 +38,68 @@ import org.junit.Test;
* @author unattributed
*/
public class ClusterServicesBroadcasterTest {
-
+
private ClusterServicesBroadcaster broadcaster;
-
+
private MulticastProtocolListener listener;
-
+
private DummyProtocolHandler handler;
-
+
private InetSocketAddress multicastAddress;
-
+
private DiscoverableService broadcastedService;
private ProtocolContext protocolContext;
-
+
private MulticastConfiguration configuration;
-
+
@Before
public void setup() throws Exception {
broadcastedService = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", 11111));
-
+
multicastAddress = new InetSocketAddress("225.1.1.1", 22222);
-
+
configuration = new MulticastConfiguration();
-
+
protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
+
broadcaster = new ClusterServicesBroadcaster(multicastAddress, configuration, protocolContext, "500 ms");
broadcaster.addService(broadcastedService);
-
+
handler = new DummyProtocolHandler();
listener = new MulticastProtocolListener(5, multicastAddress, configuration, protocolContext);
listener.addHandler(handler);
}
-
+
@After
public void teardown() {
-
- if(broadcaster.isRunning()) {
+
+ if (broadcaster.isRunning()) {
broadcaster.stop();
}
-
+
try {
- if(listener.isRunning()) {
+ if (listener.isRunning()) {
listener.stop();
}
- } catch(Exception ex) {
+ } catch (Exception ex) {
ex.printStackTrace(System.out);
}
-
+
}
-
+
@Ignore("fails needs to be fixed")
@Test
public void testBroadcastReceived() throws Exception {
-
+
broadcaster.start();
listener.start();
-
+
Thread.sleep(1000);
-
+
listener.stop();
-
+
assertNotNull(handler.getProtocolMessage());
assertEquals(ProtocolMessage.MessageType.SERVICE_BROADCAST, handler.getProtocolMessage().getType());
final ServiceBroadcastMessage msg = (ServiceBroadcastMessage) handler.getProtocolMessage();
@@ -108,11 +107,11 @@ public class ClusterServicesBroadcasterTest {
assertEquals(broadcastedService.getServiceAddress().getHostName(), msg.getAddress());
assertEquals(broadcastedService.getServiceAddress().getPort(), msg.getPort());
}
-
+
private class DummyProtocolHandler implements ProtocolHandler {
private ProtocolMessage protocolMessage;
-
+
@Override
public boolean canHandle(ProtocolMessage msg) {
return true;
@@ -123,11 +122,11 @@ public class ClusterServicesBroadcasterTest {
this.protocolMessage = msg;
return null;
}
-
+
public ProtocolMessage getProtocolMessage() {
return protocolMessage;
}
-
+
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
index 4233d88..690d416 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/MulticastProtocolListenerTest.java
@@ -35,69 +35,66 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.multicast.MulticastConfiguration;
import org.apache.nifi.io.socket.multicast.MulticastUtils;
import org.junit.After;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-/**
- * @author unattributed
- */
public class MulticastProtocolListenerTest {
-
+
private MulticastProtocolListener listener;
-
+
private MulticastSocket socket;
-
+
private InetSocketAddress address;
-
+
private MulticastConfiguration configuration;
-
+
private ProtocolContext protocolContext;
-
+
@Before
public void setup() throws Exception {
address = new InetSocketAddress("226.1.1.1", 60000);
configuration = new MulticastConfiguration();
-
+
protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
+
listener = new MulticastProtocolListener(5, address, configuration, protocolContext);
listener.start();
socket = MulticastUtils.createMulticastSocket(address.getPort(), configuration);
}
-
+
@After
public void teardown() throws IOException {
try {
- if(listener.isRunning()) {
+ if (listener.isRunning()) {
listener.stop();
}
} finally {
MulticastUtils.closeQuietly(socket);
}
}
-
+
@Ignore("Test needs to be reworked. Fails if on a system without actiev network connection")
@Test
public void testBadRequest() throws Exception {
DelayedProtocolHandler handler = new DelayedProtocolHandler(0);
listener.addHandler(handler);
- DatagramPacket packet = new DatagramPacket(new byte[] {5}, 1, address);
+ DatagramPacket packet = new DatagramPacket(new byte[]{5}, 1, address);
socket.send(packet);
Thread.sleep(250);
assertEquals(0, handler.getMessages().size());
}
-
+
@Ignore("this test works sometimes and fails others - needs work to be reliable")
@Test
public void testRequest() throws Exception {
ReflexiveProtocolHandler handler = new ReflexiveProtocolHandler();
listener.addHandler(handler);
-
+
ProtocolMessage msg = new PingMessage();
MulticastProtocolMessage multicastMsg = new MulticastProtocolMessage("some-id", msg);
@@ -112,13 +109,13 @@ public class MulticastProtocolListenerTest {
Thread.sleep(250);
assertEquals(1, handler.getMessages().size());
assertEquals(msg.getType(), handler.getMessages().get(0).getType());
-
+
}
-
+
private class ReflexiveProtocolHandler implements ProtocolHandler {
-
+
private List<ProtocolMessage> messages = new ArrayList<>();
-
+
@Override
public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
messages.add(msg);
@@ -129,30 +126,30 @@ public class MulticastProtocolListenerTest {
public boolean canHandle(ProtocolMessage msg) {
return true;
}
-
+
public List<ProtocolMessage> getMessages() {
return messages;
}
-
+
}
private class DelayedProtocolHandler implements ProtocolHandler {
-
+
private int delay = 0;
-
+
private List<ProtocolMessage> messages = new ArrayList<>();
-
+
public DelayedProtocolHandler(int delay) {
this.delay = delay;
}
-
+
@Override
public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
try {
messages.add(msg);
Thread.sleep(delay);
return null;
- } catch(final InterruptedException ie) {
+ } catch (final InterruptedException ie) {
throw new ProtocolException(ie);
}
@@ -162,10 +159,10 @@ public class MulticastProtocolListenerTest {
public boolean canHandle(ProtocolMessage msg) {
return true;
}
-
+
public List<ProtocolMessage> getMessages() {
return messages;
}
-
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
index 1c5ba9e..19834ae 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/NodeProtocolSenderImplTest.java
@@ -16,9 +16,6 @@
*/
package org.apache.nifi.cluster.protocol.impl;
-import org.apache.nifi.cluster.protocol.impl.ClusterServiceLocator;
-import org.apache.nifi.cluster.protocol.impl.SocketProtocolListener;
-import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderImpl;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@@ -58,104 +55,102 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-/**
- * @author unattributed
- */
@Ignore("Randomly tests... probably timing-specific")
public class NodeProtocolSenderImplTest {
-
+
private SocketProtocolListener listener;
-
+
private NodeProtocolSenderImpl sender;
-
+
private DiscoverableService service;
-
+
private ServerSocketConfiguration serverSocketConfiguration;
-
+
private ClusterServiceLocator mockServiceLocator;
-
+
private ProtocolHandler mockHandler;
-
+
private NodeIdentifier nodeIdentifier;
-
+
@Before
public void setup() throws IOException {
-
+
serverSocketConfiguration = new ServerSocketConfiguration();
mockServiceLocator = mock(ClusterServiceLocator.class);
mockHandler = mock(ProtocolHandler.class);
-
+
nodeIdentifier = new NodeIdentifier("1", "localhost", 1234, "localhost", 5678);
-
+
ProtocolContext protocolContext = new JaxbProtocolContext(JaxbProtocolUtils.JAXB_CONTEXT);
-
+
listener = new SocketProtocolListener(5, 0, serverSocketConfiguration, protocolContext);
listener.setShutdownListenerSeconds(3);
listener.addHandler(mockHandler);
listener.start();
-
+
service = new DiscoverableServiceImpl("some-service", new InetSocketAddress("localhost", listener.getPort()));
-
+
SocketConfiguration socketConfiguration = new SocketConfiguration();
socketConfiguration.setReuseAddress(true);
sender = new NodeProtocolSenderImpl(mockServiceLocator, socketConfiguration, protocolContext);
}
-
+
@After
public void teardown() throws IOException {
- if(listener.isRunning()) {
+ if (listener.isRunning()) {
listener.stop();
}
}
-
+
@Test
public void testConnect() throws Exception {
-
+
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
ConnectionResponseMessage mockMessage = new ConnectionResponseMessage();
- mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier, new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
+ mockMessage.setConnectionResponse(new ConnectionResponse(nodeIdentifier,
+ new StandardDataFlow("flow".getBytes("UTF-8"), new byte[0], new byte[0]), false, null, null, UUID.randomUUID().toString()));
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(mockMessage);
-
+
ConnectionRequestMessage request = new ConnectionRequestMessage();
request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
ConnectionResponseMessage response = sender.requestConnection(request);
assertNotNull(response);
}
-
+
@Test(expected = UnknownServiceAddressException.class)
public void testConnectNoClusterManagerAddress() throws Exception {
-
+
when(mockServiceLocator.getService()).thenReturn(null);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new ConnectionResponseMessage());
-
+
ConnectionRequestMessage request = new ConnectionRequestMessage();
request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
sender.requestConnection(request);
fail("failed to throw exception");
}
-
+
@Test(expected = ProtocolException.class)
public void testConnectBadResponse() throws Exception {
-
+
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(new PingMessage());
-
+
ConnectionRequestMessage request = new ConnectionRequestMessage();
request.setConnectionRequest(new ConnectionRequest(nodeIdentifier));
-
+
sender.requestConnection(request);
fail("failed to throw exception");
-
+
}
-
+
@Test(expected = ProtocolException.class)
public void testConnectDelayedResponse() throws Exception {
-
+
final int time = 250;
sender.getSocketConfiguration().setSocketTimeout(time);
when(mockServiceLocator.getService()).thenReturn(service);
@@ -172,28 +167,28 @@ public class NodeProtocolSenderImplTest {
sender.requestConnection(request);
fail("failed to throw exception");
-
+
}
-
+
@Test
public void testHeartbeat() throws Exception {
-
+
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
+
HeartbeatMessage hb = new HeartbeatMessage();
- hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, new byte[] {1, 2, 3}));
+ hb.setHeartbeat(new Heartbeat(new NodeIdentifier("id", "localhost", 3, "localhost", 4), false, false, new byte[]{1, 2, 3}));
sender.heartbeat(hb);
}
-
+
@Test
public void testNotifyControllerStartupFailure() throws Exception {
-
+
when(mockServiceLocator.getService()).thenReturn(service);
when(mockHandler.canHandle(any(ProtocolMessage.class))).thenReturn(Boolean.TRUE);
when(mockHandler.handle(any(ProtocolMessage.class))).thenReturn(null);
-
+
ControllerStartupFailureMessage msg = new ControllerStartupFailureMessage();
msg.setNodeId(new NodeIdentifier("some-id", "some-addr", 1, "some-addr", 1));
msg.setExceptionMessage("some exception");
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
index 4e3b932..803797b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/test/java/org/apache/nifi/cluster/protocol/impl/testutils/ReflexiveProtocolHandler.java
@@ -26,9 +26,9 @@ import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
* @author unattributed
*/
public class ReflexiveProtocolHandler implements ProtocolHandler {
-
+
private List<ProtocolMessage> messages = new ArrayList<>();
-
+
@Override
public ProtocolMessage handle(ProtocolMessage msg) throws ProtocolException {
messages.add(msg);
@@ -43,5 +43,5 @@ public class ReflexiveProtocolHandler implements ProtocolHandler {
public List<ProtocolMessage> getMessages() {
return messages;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
index 7b28bbe..63c07c0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/pom.xml
@@ -43,8 +43,8 @@
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-site-to-site-client</artifactId>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-site-to-site-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
index 3d5c75d..7c40092 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/connectable/Connectable.java
@@ -46,7 +46,7 @@ public interface Connectable extends Triggerable {
/**
* Returns the ProcessorRelationship whose name is given
*
- * @param relationshipName
+ * @param relationshipName name
* @return a ProcessorRelationship whose name is given, or <code>null</code>
* if none exists
*/
@@ -69,7 +69,7 @@ public interface Connectable extends Triggerable {
/**
*
- * @param connection
+ * @param connection to remove
* @throws IllegalStateException if the given Connection is not registered
* to <code>this</code>.
*/
@@ -79,8 +79,8 @@ public interface Connectable extends Triggerable {
* Updates any internal state that depends on the given connection. The
* given connection will share the same ID as the old connection.
*
- * @param newConnection
- * @throws IllegalStateException
+ * @param newConnection new connection
+ * @throws IllegalStateException ise
*/
void updateConnection(Connection newConnection) throws IllegalStateException;
@@ -98,7 +98,7 @@ public interface Connectable extends Triggerable {
Set<Connection> getConnections();
/**
- * @param relationship
+ * @param relationship to get connections for
* @return a <code>Set</code> of all <code>Connection</code>s that contain
* the given relationship for which this <code>Connectable</code> is the
* source
@@ -106,16 +106,14 @@ public interface Connectable extends Triggerable {
Set<Connection> getConnections(Relationship relationship);
/**
- * Returns the position on the graph where this Connectable is located
- *
- * @return
+ * @return the position on the graph where this Connectable is located
*/
Position getPosition();
/**
* Updates this component's position on the graph
*
- * @param position
+ * @param position new position
*/
void setPosition(Position position);
@@ -127,7 +125,8 @@ public interface Connectable extends Triggerable {
/**
* Sets the name of this Connectable so that its name will be visible on the
* UI
- * @param name
+ *
+ * @param name new name
*/
void setName(String name);
@@ -138,31 +137,28 @@ public interface Connectable extends Triggerable {
/**
* Sets the comments of this Connectable.
- * @param comments
+ *
+ * @param comments of this Connectable
*/
void setComments(String comments);
/**
- * If true,
+ * @return If true,
* {@link #onTrigger(nifi.processor.ProcessContext, nifi.processor.ProcessSessionFactory)}
* should be called even when this Connectable has no FlowFiles queued for
* processing
- *
- * @return
*/
boolean isTriggerWhenEmpty();
/**
- * Returns the ProcessGroup to which this <code>Connectable</code> belongs
- *
- * @return
+ * @return the ProcessGroup to which this <code>Connectable</code> belongs
*/
ProcessGroup getProcessGroup();
/**
* Sets the new ProcessGroup to which this <code>Connectable</code> belongs
*
- * @param group
+ * @param group new group
*/
void setProcessGroup(ProcessGroup group);
@@ -177,15 +173,13 @@ public interface Connectable extends Triggerable {
boolean isAutoTerminated(Relationship relationship);
/**
- * Indicates whether flow file content made by this connectable must be
- * persisted
- *
- * @return
+ * @return Indicates whether flow file content made by this connectable must
+ * be persisted
*/
boolean isLossTolerant();
/**
- * @param lossTolerant
+ * @param lossTolerant true if it is
*/
void setLossTolerant(boolean lossTolerant);
@@ -195,41 +189,33 @@ public interface Connectable extends Triggerable {
ConnectableType getConnectableType();
/**
- * Returns the any validation errors for this connectable.
- *
- * @return
+ * @return any validation errors for this connectable
*/
Collection<ValidationResult> getValidationErrors();
/**
- * Returns the amount of time for which a FlowFile should be penalized when
+ * @param timeUnit unit over which to interpret the duration
+ * @return the amount of time for which a FlowFile should be penalized when
* {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called
- *
- * @param timeUnit
- * @return
*/
long getPenalizationPeriod(final TimeUnit timeUnit);
/**
- * Returns a string representation for which a FlowFile should be penalized
+ * @return a string representation for which a FlowFile should be penalized
* when {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called
- *
- * @return
*/
String getPenalizationPeriod();
/**
* @param timeUnit determines the unit of time to represent the yield
* period.
- * @return
+ * @return yield period
*/
long getYieldPeriod(TimeUnit timeUnit);
/**
- * returns the string representation for this Connectable's configured yield
+ * @return the string representation for this Connectable's configured yield
* period
- *
- * @return
*/
String getYieldPeriod();
@@ -238,14 +224,15 @@ public interface Connectable extends Triggerable {
* scheduled when the processor calls
* {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
*
- * @param yieldPeriod
+ * @param yieldPeriod new yield period
*/
void setYieldPeriod(String yieldPeriod);
/**
* Updates the amount of time that this Connectable will penalize FlowFiles
* when {@link ProcessSession#penalize(nifi.flowfile.FlowFile)} is called
- * @param penalizationPeriod
+ *
+ * @param penalizationPeriod new period
*/
void setPenalizationPeriod(String penalizationPeriod);
@@ -258,18 +245,14 @@ public interface Connectable extends Triggerable {
void yield();
/**
- * Returns the time in milliseconds since Epoch at which this Connectable
+ * @return the time in milliseconds since Epoch at which this Connectable
* should no longer yield its threads
- *
- * @return
*/
long getYieldExpiration();
/**
- * Specifies whether or not this component is considered side-effect free,
- * with respect to external systems.
- *
- * @return
+ * @return Specifies whether or not this component is considered side-effect free,
+ * with respect to external systems
*/
boolean isSideEffectFree();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index c44161f..978c612 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -150,16 +150,16 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
final PropertyDescriptor descriptor = component.getPropertyDescriptor(name);
String value = null;
if (!descriptor.isRequired() && (value = properties.remove(descriptor)) != null) {
-
- if ( descriptor.getControllerServiceDefinition() != null ) {
- if (value != null) {
+
+ if (descriptor.getControllerServiceDefinition() != null) {
+ if (value != null) {
final ControllerServiceNode oldNode = serviceProvider.getControllerServiceNode(value);
if (oldNode != null) {
oldNode.removeReference(this);
}
}
- }
-
+ }
+
component.onPropertyModified(descriptor, value, null);
return true;
}
@@ -261,12 +261,11 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
return true;
}
-
@Override
public Collection<ValidationResult> getValidationErrors() {
return getValidationErrors(Collections.<String>emptySet());
}
-
+
public Collection<ValidationResult> getValidationErrors(final Set<String> serviceIdentifiersNotToValidate) {
final List<ValidationResult> results = new ArrayList<>();
lock.lock();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
index e1d2dd4..50ba12a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractPort.java
@@ -294,8 +294,8 @@ public abstract class AbstractPort implements Port {
* Verify that removing this connection will not prevent this Port from
* still being connected via each relationship
*
- * @param connection
- * @return
+ * @param connection to test for removal
+ * @return true if can be removed
*/
private boolean canConnectionBeRemoved(final Connection connection) {
final Connectable source = connection.getSource();
@@ -368,11 +368,6 @@ public abstract class AbstractPort implements Port {
}
}
- /**
- * Indicates whether or not this Port is valid.
- *
- * @return
- */
@Override
public abstract boolean isValid();
@@ -399,18 +394,11 @@ public abstract class AbstractPort implements Port {
concurrentTaskCount.set(taskCount);
}
- /**
- * @return the number of tasks that may execute concurrently for this
- * processor
- */
@Override
public int getMaxConcurrentTasks() {
return concurrentTaskCount.get();
}
- /**
- *
- */
@Override
public void shutdown() {
scheduledState.set(ScheduledState.STOPPED);
@@ -450,13 +438,6 @@ public abstract class AbstractPort implements Port {
return type;
}
- /**
- * Updates the amount of time that this processor should avoid being
- * scheduled when the processor calls
- * {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
- *
- * @param yieldPeriod
- */
@Override
public void setYieldPeriod(final String yieldPeriod) {
final long yieldMillis = FormatUtils.getTimeDuration(requireNonNull(yieldPeriod), TimeUnit.MILLISECONDS);
@@ -466,9 +447,6 @@ public abstract class AbstractPort implements Port {
this.yieldPeriod.set(yieldPeriod);
}
- /**
- * @param schedulingPeriod
- */
@Override
public void setScheduldingPeriod(final String schedulingPeriod) {
final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
@@ -490,12 +468,6 @@ public abstract class AbstractPort implements Port {
return penalizationPeriod.get();
}
- /**
- * Causes the processor not to be scheduled for some period of time. This
- * duration can be obtained and set via the
- * {@link #getYieldPeriod(TimeUnit)} and
- * {@link #setYieldPeriod(long, TimeUnit)} methods.
- */
@Override
public void yield() {
final long yieldMillis = getYieldPeriod(TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
index 5b95524..8b2794d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ConfiguredComponent.java
@@ -55,9 +55,7 @@ public interface ConfiguredComponent {
boolean isValid();
/**
- * Returns the any validation errors for this connectable.
- *
- * @return
+ * @return the any validation errors for this connectable
*/
Collection<ValidationResult> getValidationErrors();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java
index eee878e..f91fe8f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ContentAvailability.java
@@ -23,43 +23,33 @@ package org.apache.nifi.controller;
public interface ContentAvailability {
/**
- * Returns a boolean indicating whether or not the Input content is
+ * @return a boolean indicating whether or not the Input content is
* available
- *
- * @return
*/
boolean isInputAvailable();
/**
- * Returns a boolean indicating whether or not the Output content is
+ * @return a boolean indicating whether or not the Output content is
* available
- *
- * @return
*/
boolean isOutputAvailable();
/**
- * Returns <code>true</code> if the Input content is the same as the Output
+ * @return <code>true</code> if the Input content is the same as the Output
* content
- *
- * @return
*/
boolean isContentSame();
/**
- * Returns a boolean indicating whether or not the content is replayable. If
+ * @return a boolean indicating whether or not the content is replayable. If
* this returns <code>false</code>, the reason that replay is not available
- * can be determined by calling {@link #getReasonNotReplayable()}.
- *
- * @return
+ * can be determined by calling {@link #getReasonNotReplayable()}
*/
boolean isReplayable();
/**
- * Returns the reason that the content cannot be replayed, or
- * <code>null</code> if the content can be replayed.
- *
- * @return
+ * @return the reason that the content cannot be replayed, or
+ * <code>null</code> if the content can be replayed
*/
String getReasonNotReplayable();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index c3b6613..ee8d9b4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -35,7 +35,7 @@ public interface ProcessScheduler {
* are annotated with the {@link OnScheduled} annotation. If the Processor
* is already scheduled to run, does nothing.
*
- * @param procNode
+ * @param procNode to start
* @throws IllegalStateException if the Processor is disabled
*/
void startProcessor(ProcessorNode procNode);
@@ -46,7 +46,8 @@ public interface ProcessScheduler {
* are annotated with the {@link OnUnscheduled} annotation. This does not
* interrupt any threads that are currently running within the given
* Processor. If the Processor is not scheduled to run, does nothing.
- * @param procNode
+ *
+ * @param procNode to stop
*/
void stopProcessor(ProcessorNode procNode);
@@ -54,7 +55,7 @@ public interface ProcessScheduler {
* Starts scheduling the given Port to run. If the Port is already scheduled
* to run, does nothing.
*
- * @param port
+ * @param port to start
*
* @throws IllegalStateException if the Port is disabled
*/
@@ -66,7 +67,7 @@ public interface ProcessScheduler {
* interrupt any threads that are currently running within the given Port.
* If the Port is not scheduled to run, does nothing.
*
- * @param port
+ * @param port to stop
*/
void stopPort(Port port);
@@ -74,7 +75,7 @@ public interface ProcessScheduler {
* Starts scheduling the given Funnel to run. If the funnel is already
* scheduled to run, does nothing.
*
- * @param funnel
+ * @param funnel to start
*
* @throws IllegalStateException if the Funnel is disabled
*/
@@ -85,7 +86,7 @@ public interface ProcessScheduler {
* threads that are currently running within the given funnel. If the funnel
* is not scheduled to run, does nothing.
*
- * @param funnel
+ * @param funnel to stop
*/
void stopFunnel(Funnel funnel);
@@ -102,27 +103,23 @@ public interface ProcessScheduler {
void disableProcessor(ProcessorNode procNode);
/**
- * Returns the number of threads currently active for the given
- * <code>Connectable</code>.
- *
- * @param scheduled
- * @return
+ * @param scheduled scheduled component
+ * @return the number of threads currently active for the given
+ * <code>Connectable</code>
*/
int getActiveThreadCount(Object scheduled);
/**
- * Returns a boolean indicating whether or not the given object is scheduled
+ * @param scheduled component to test
+ * @return a boolean indicating whether or not the given object is scheduled
* to run
- *
- * @param scheduled
- * @return
*/
boolean isScheduled(Object scheduled);
/**
* Registers a relevant event for an Event-Driven worker
*
- * @param worker
+ * @param worker to register
*/
void registerEvent(Connectable worker);
@@ -130,8 +127,8 @@ public interface ProcessScheduler {
* Notifies the ProcessScheduler of how many threads are available to use
* for the given {@link SchedulingStrategy}
*
- * @param strategy
- * @param maxThreadCount
+ * @param strategy scheduling strategy
+ * @param maxThreadCount max threads
*/
void setMaxThreadCount(SchedulingStrategy strategy, int maxThreadCount);
@@ -139,31 +136,36 @@ public interface ProcessScheduler {
* Notifies the Scheduler that it should stop scheduling the given component
* until its yield duration has expired
*
- * @param procNode
+ * @param procNode processor
*/
void yield(ProcessorNode procNode);
-
+
/**
* Stops scheduling the given Reporting Task to run
- * @param taskNode
+ *
+ * @param taskNode to unschedule
*/
void unschedule(ReportingTaskNode taskNode);
-
+
/**
* Begins scheduling the given Reporting Task to run
- * @param taskNode
+ *
+ * @param taskNode to schedule
*/
void schedule(ReportingTaskNode taskNode);
-
+
/**
- * Enables the Controller Service so that it can be used by Reporting Tasks and Processors
- * @param service
+ * Enables the Controller Service so that it can be used by Reporting Tasks
+ * and Processors
+ *
+ * @param service to enable
*/
void enableControllerService(ControllerServiceNode service);
-
+
/**
* Disables the Controller Service so that it can be updated
- * @param service
+ *
+ * @param service to disable
*/
void disableControllerService(ControllerServiceNode service);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
index 3189edd..66967ba 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessorNode.java
@@ -79,18 +79,20 @@ public abstract class ProcessorNode extends AbstractConfiguredComponent implemen
public abstract void setStyle(Map<String, String> style);
/**
- * Returns the number of threads (concurrent tasks) currently being used by this Processor
- * @return
+ * @return the number of threads (concurrent tasks) currently being used by
+ * this Processor
*/
public abstract int getActiveThreadCount();
-
+
/**
* Verifies that this Processor can be started if the provided set of
- * services are enabled. This is introduced because we need to verify that all components
- * can be started before starting any of them. In order to do that, we need to know that this
- * component can be started if the given services are enabled, as we will then enable the given
- * services before starting this component.
- * @param ignoredReferences
+ * services are enabled. This is introduced because we need to verify that
+ * all components can be started before starting any of them. In order to do
+ * that, we need to know that this component can be started if the given
+ * services are enabled, as we will then enable the given services before
+ * starting this component.
+ *
+ * @param ignoredReferences to ignore
*/
public abstract void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index c932f30..c2adf01 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -40,7 +40,8 @@ public interface ReportingTaskNode extends ConfiguredComponent {
/**
* Updates how often the ReportingTask should be triggered to run
- * @param schedulingPeriod
+ *
+ * @param schedulingPeriod new period
*/
void setScheduldingPeriod(String schedulingPeriod);
@@ -53,43 +54,48 @@ public interface ReportingTaskNode extends ConfiguredComponent {
boolean isRunning();
/**
- * Returns the number of threads (concurrent tasks) currently being used by this ReportingTask
- * @return
+ * @return the number of threads (concurrent tasks) currently being used by
+ * this ReportingTask
*/
int getActiveThreadCount();
-
+
/**
- * Indicates the {@link ScheduledState} of this <code>ReportingTask</code>. A
- * value of stopped does NOT indicate that the <code>ReportingTask</code> has
- * no active threads, only that it is not currently scheduled to be given
- * any more threads. To determine whether or not the
+ * @return Indicates the {@link ScheduledState} of this <code>ReportingTask</code>.
+ * A value of stopped does NOT indicate that the <code>ReportingTask</code>
+ * has no active threads, only that it is not currently scheduled to be
+ * given any more threads. To determine whether or not the
* <code>ReportingTask</code> has any active threads, see
- * {@link ProcessScheduler#getActiveThreadCount(ReportingTask)}.
- *
- * @return
+ * {@link ProcessScheduler#getActiveThreadCount(ReportingTask)}
*/
ScheduledState getScheduledState();
-
+
void setScheduledState(ScheduledState state);
-
+
String getComments();
-
+
void setComments(String comment);
-
+
/**
* Verifies that this Reporting Task can be enabled if the provided set of
- * services are enabled. This is introduced because we need to verify that all components
- * can be started before starting any of them. In order to do that, we need to know that this
- * component can be started if the given services are enabled, as we will then enable the given
- * services before starting this component.
- * @param ignoredReferences
+ * services are enabled. This is introduced because we need to verify that
+ * all components can be started before starting any of them. In order to do
+ * that, we need to know that this component can be started if the given
+ * services are enabled, as we will then enable the given services before
+ * starting this component.
+ *
+ * @param ignoredReferences to ignore
*/
void verifyCanStart(Set<ControllerServiceNode> ignoredReferences);
-
+
void verifyCanStart();
+
void verifyCanStop();
+
void verifyCanDisable();
+
void verifyCanEnable();
+
void verifyCanDelete();
+
void verifyCanUpdate();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 59d2308..572f8d6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -212,7 +212,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
/**
* MUST be called with lock held
*
- * @return
+ * @return size of queue
*/
private QueueSize getQueueSize() {
final QueueSize unacknowledged = unacknowledgedSizeRef.get();
@@ -350,7 +350,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
/**
* MUST be called with either the read or write lock held
*
- * @return
+ * @return true if full
*/
private boolean determineIfFull() {
final long maxSize = maximumQueueObjectCount;
@@ -1011,7 +1011,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
if (record == null || record.isPenalized()) {
// not enough unpenalized records to pull. Put all records back and return
activeQueue.addAll(buffer);
- if ( record != null ) {
+ if (record != null) {
activeQueue.add(record);
}
return;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
index 3bdfd20..0effbaf 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFunnel.java
@@ -263,7 +263,7 @@ public class StandardFunnel implements Funnel {
/**
* Throws {@link UnsupportedOperationException}
*
- * @param name
+ * @param name new name
*/
@Override
public void setName(final String name) {
@@ -403,7 +403,7 @@ public class StandardFunnel implements Funnel {
* scheduled when the processor calls
* {@link nifi.processor.ProcessContext#yield() ProcessContext.yield()}
*
- * @param yieldPeriod
+ * @param yieldPeriod new period
*/
@Override
public void setYieldPeriod(final String yieldPeriod) {
@@ -414,9 +414,6 @@ public class StandardFunnel implements Funnel {
this.yieldPeriod.set(yieldPeriod);
}
- /**
- * @param schedulingPeriod
- */
@Override
public void setScheduldingPeriod(final String schedulingPeriod) {
final long schedulingNanos = FormatUtils.getTimeDuration(requireNonNull(schedulingPeriod), TimeUnit.NANOSECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
index 09479d5..a7118d4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
@@ -25,7 +25,7 @@ import org.apache.nifi.components.ValidationContext;
public interface ValidationContextFactory {
ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData);
-
+
ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData);
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
index 0ff68b0..327be36 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceAlreadyExistsException.java
@@ -20,10 +20,6 @@ public class ControllerServiceAlreadyExistsException extends RuntimeException {
private static final long serialVersionUID = -544424320587059277L;
- /**
- * Constructs a default exception
- * @param id
- */
public ControllerServiceAlreadyExistsException(final String id) {
super("A Controller Service already exists with ID " + id);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
index 18cfcda..f2118d7 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
@@ -20,31 +20,18 @@ public class ControllerServiceInstantiationException extends RuntimeException {
private static final long serialVersionUID = -544424320587059277L;
- /**
- * Constructs a default exception
- */
public ControllerServiceInstantiationException() {
super();
}
- /**
- * @param message
- */
public ControllerServiceInstantiationException(String message) {
super(message);
}
- /**
- * @param cause
- */
public ControllerServiceInstantiationException(Throwable cause) {
super(cause);
}
- /**
- * @param message
- * @param cause
- */
public ControllerServiceInstantiationException(String message, Throwable cause) {
super(message, cause);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
index bb6f3f7..9f16798 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
@@ -20,84 +20,92 @@ import java.util.Set;
import org.apache.nifi.controller.ReportingTaskNode;
/**
- * A ReportingTaskProvider is responsible for providing management of, and access to, Reporting Tasks
+ * A ReportingTaskProvider is responsible for providing management of, and
+ * access to, Reporting Tasks
*/
public interface ReportingTaskProvider {
/**
* Creates a new instance of a reporting task
- *
- * @param type the type (fully qualified class name) of the reporting task to instantiate
+ *
+ * @param type the type (fully qualified class name) of the reporting task
+ * to instantiate
* @param id the identifier for the Reporting Task
- * @param firstTimeAdded whether or not this is the first time that the reporting task is being added
- * to the flow. I.e., this will be true only when the user adds the reporting task to the flow, not when
- * the flow is being restored after a restart of the software
- *
+ * @param firstTimeAdded whether or not this is the first time that the
+ * reporting task is being added to the flow. I.e., this will be true only
+ * when the user adds the reporting task to the flow, not when the flow is
+ * being restored after a restart of the software
+ *
* @return the ReportingTaskNode that is used to manage the reporting task
- *
- * @throws ReportingTaskInstantiationException if unable to create the Reporting Task
+ *
+ * @throws ReportingTaskInstantiationException if unable to create the
+ * Reporting Task
*/
ReportingTaskNode createReportingTask(String type, String id, boolean firstTimeAdded) throws ReportingTaskInstantiationException;
-
+
/**
- * Returns the reporting task that has the given identifier, or <code>null</code> if no reporting task
- * exists with that ID.
- *
- * @param identifier
- * @return
+ * @param identifier of node
+ * @return the reporting task that has the given identifier, or
+ * <code>null</code> if no reporting task exists with that ID
*/
ReportingTaskNode getReportingTaskNode(String identifier);
-
+
/**
- * Returns a Set of all Reporting Tasks that exist for this service provider.
- * @return
+ * @return a Set of all Reporting Tasks that exist for this service
+ * provider
*/
Set<ReportingTaskNode> getAllReportingTasks();
-
+
/**
* Removes the given reporting task from the flow
- *
+ *
* @param reportingTask
- *
- * @throws IllegalStateException if the reporting task cannot be removed because it is not stopped, or
- * if the reporting task is not known in the flow
+ *
+ * @throws IllegalStateException if the reporting task cannot be removed
+ * because it is not stopped, or if the reporting task is not known in the
+ * flow
*/
void removeReportingTask(ReportingTaskNode reportingTask);
-
+
/**
- * Begins scheduling the reporting task to run and invokes appropriate lifecycle methods
+ * Begins scheduling the reporting task to run and invokes appropriate
+ * lifecycle methods
+ *
* @param reportingTask
- *
- * @throws IllegalStateException if the ReportingTask's state is not STOPPED, or if the Reporting Task has active
- * threads, or if the ReportingTask is not valid
+ *
+ * @throws IllegalStateException if the ReportingTask's state is not
+ * STOPPED, or if the Reporting Task has active threads, or if the
+ * ReportingTask is not valid
*/
void startReportingTask(ReportingTaskNode reportingTask);
-
+
/**
- * Stops scheduling the reporting task to run and invokes appropriate lifecycle methods
+ * Stops scheduling the reporting task to run and invokes appropriate
+ * lifecycle methods
+ *
* @param reportingTask
- *
+ *
* @throws IllegalStateException if the ReportingTask's state is not RUNNING
*/
void stopReportingTask(ReportingTaskNode reportingTask);
-
-
+
/**
* Enables the reporting task to be scheduled to run
+ *
* @param reportingTask
- *
- * @throws IllegalStateException if the ReportingTask's state is not DISABLED
+ *
+ * @throws IllegalStateException if the ReportingTask's state is not
+ * DISABLED
*/
void enableReportingTask(ReportingTaskNode reportingTask);
-
-
+
/**
* Disables the ability to schedul the reporting task to run
- *
+ *
* @param reportingTask
- *
- * @throws IllegalStateException if the ReportingTask's state is not STOPPED, or if the Reporting Task has active
- * threads
+ *
+ * @throws IllegalStateException if the ReportingTask's state is not
+ * STOPPED, or if the Reporting Task has active threads
*/
void disableReportingTask(ReportingTaskNode reportingTask);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
index 2eb3caf..560dc05 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/repository/FlowFileEventRepository.java
@@ -19,24 +19,19 @@ package org.apache.nifi.controller.repository;
import java.io.Closeable;
import java.io.IOException;
-/**
- *
- * @author none
- */
public interface FlowFileEventRepository extends Closeable {
/**
* Updates the repository to include a new FlowFile processing event
*
- * @param event
- * @throws java.io.IOException
+ * @param event new event
+ * @throws java.io.IOException ioe
*/
void updateRepository(FlowFileEvent event) throws IOException;
/**
- * Returns a report of processing activity since the given time
- * @param sinceEpochMillis
- * @return
+ * @param sinceEpochMillis age of report
+ * @return a report of processing activity since the given time
*/
RepositoryStatusReport reportTransferEvents(long sinceEpochMillis);
@@ -44,7 +39,7 @@ public interface FlowFileEventRepository extends Closeable {
* Causes any flow file events of the given entry age in epoch milliseconds
* or older to be purged from the repository
*
- * @param cutoffEpochMilliseconds
+ * @param cutoffEpochMilliseconds cutoff
*/
void purgeTransferEvents(long cutoffEpochMilliseconds);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 50bf469..10933db 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -24,45 +24,52 @@ import org.apache.nifi.controller.ControllerService;
public interface ControllerServiceNode extends ConfiguredComponent {
ControllerService getProxiedControllerService();
-
+
ControllerService getControllerServiceImplementation();
ControllerServiceState getState();
+
void setState(ControllerServiceState state);
-
+
ControllerServiceReference getReferences();
void addReference(ConfiguredComponent referringComponent);
void removeReference(ConfiguredComponent referringComponent);
-
+
void setComments(String comment);
+
String getComments();
-
+
void verifyCanEnable();
+
void verifyCanDisable();
-
+
/**
- * Verifies that this Controller Service can be disabled if the provided set of
- * services are also disabled. This is introduced because we can have an instance
- * where A references B, which references C, which references A and we want
- * to disable service C. In this case, the cycle needs to not cause us to fail,
- * so we want to verify that C can be disabled if A and B also are.
- *
- * @param ignoredReferences
+ * Verifies that this Controller Service can be disabled if the provided set
+ * of services are also disabled. This is introduced because we can have an
+ * instance where A references B, which references C, which references A and
+ * we want to disable service C. In this case, the cycle needs to not cause
+ * us to fail, so we want to verify that C can be disabled if A and B also
+ * are.
+ *
+ * @param ignoredReferences references to ignore
*/
void verifyCanDisable(Set<ControllerServiceNode> ignoredReferences);
-
+
/**
- * Verifies that this Controller Service can be enabled if the provided set of
- * services are also enabled. This is introduced because we can have an instance where
- * A reference B, which references C, which references A and we want to enable
- * Service A. In this case, the cycle needs to not cause us to fail, so we want to verify
- * that A can be enabled if A and B also are.
- * @param ignoredReferences
+ * Verifies that this Controller Service can be enabled if the provided set
+ * of services are also enabled. This is introduced because we can have an
+ * instance where A reference B, which references C, which references A and
+ * we want to enable Service A. In this case, the cycle needs to not cause
+ * us to fail, so we want to verify that A can be enabled if A and B also
+ * are.
+ *
+ * @param ignoredReferences to ignore
*/
void verifyCanEnable(Set<ControllerServiceNode> ignoredReferences);
-
+
void verifyCanDelete();
+
void verifyCanUpdate();
}